Akka stream broadcasthub with cache

akka

#1

Dear Hakker,

When using a broadcasthub sink, is there a way to cache the last n element so when a new consumer subscribe the last n elements can be replayed ?

Thanks!


(Johan Andrén) #2

Nothing built in, and no way I can think of that would achieve that by composition other stream operators with the existing BroadcastHub implementation, so I think it’d have to be a new custom graph stage (or potentially modifications on the existing hub but it is quite an intricate machinery already so it’s probably not a good idea to bolt more features onto it)


#3

Ok thanks, I’ll try to implement this if possible !


(Julian Howarth) #4

We do something along these lines albeit with relatively short-lived, low-volume streams but you may be able to adapt to your scenario. The 2 techniques we use are:

  1. Each message contains all the preceding messages’ data as well as its own - just a List that each new data point is added to (in our case this maxes out at about 2000 so memory is not an issue). But you could possibly adapt this so that the input stage to the broadcast hub keeps track of the last n messages, then you output a message containing current data and previous n.

  2. The above allows consumers to see all the data but has the issue that you have to wait for the next message in order to see any data. In our case this would mean waiting up to 5s which wasn’t acceptable. So we have an additional flow of control messages from the consumer back to the input stage of the broadcast hub (via a merge hub) that allow us to request a re-broadcast of data whenever a new consumer connects. Already connected consumers just ignore messages that are flagged as re-broadcasts. The consumer that just connected gets the data pretty much immediately.

Both of the above are implemented with standard streams operators: mergeHub, broadcastHub and statefulMapConcat.

Julian


#5

Thanks,

I think I could implement the first one using a scan that return the max last n messages and the current. That could do the trick!
I will try.


#6

To complete this post, I achieve that this way :

  implicit val system = ActorSystem()
  import system.dispatcher
  implicit val mat = ActorMaterializer()

  val maxSize = 5

  val (queue, tmpSource) = Source.queue[Int](10, backpressure)
    .scan((option.none[Int], Seq.empty[Int])) {
      case ((None, seq), elt) =>
        (Some(elt), seq)
      case ((Some(last), seq), elt) =>
        val l = if (seq.size == 20) {
          seq.dropRight(1) :+ last
        } else {
          seq :+ last
        }
        (Some(elt), l)
    }
    .toMat(BroadcastHub.sink)(Keep.both)
    .run

  val source = tmpSource.statefulMapConcat { () =>
    var first = true
    current => current match {
      case (None, seq) if first =>
        first = false
        seq.toList
      case (Some(elt), seq) if first =>
          first = false
          (seq :+ elt).toList
      case (Some(elt), _) =>
          List(elt)
    }
  }

  val s1 = source.take(20).runWith(Sink.seq)
  result(Future.sequence((0 to 10).map(queue.offer)), 1.second)
  val s2 = source.take(20).runWith(Sink.seq)
  result(Future.sequence((11 to 20).map(queue.offer)), 1.second)

  println(s"S 1: ${result(s1, 5.second)}")
  println(s"S 2: ${result(s2, 5.second)}")
  println(s"S 3: ${result(source.take(20).runWith(Sink.seq), 5.second)}")