Stateful subscription using streams


(Igor Baltiyskiy) #1

Problem: there’s a source that contains events that appear there as they happen in the system (through Source.queue()), then it runs through scan() to be consumed by an unknown number of subscribers that come at arbitrary points in time. For each subscriber, I want to give it a summary of what it has missed (by re-creating the missed events from the collected statistics) and then to keep feeding it new events as they arrive.

My first move was to just place a BroadcastHub after the scan(), but it obviously doesn’t work, because an existing subscriber can drain it, and then a new subscriber doesn’t get anything until an event happens. I need it to get the events right away.

Is there a way to do it using Akka Streams, short of writing my own BroadcastHub?


(Igor Baltiyskiy) #2

Note: this might be similar to this topic Question about stateful multiplexed protocols on top of akka-streams, but something seems off there — the author talks about GraphStage[FlowShape] and MergeHub, which don’t look like what I need to do.


(Igor Baltiyskiy) #3

For now, I’m trying to solve this using two custom GraphStages — RepeatLast and Dedup.
RepeatLast stores last pulled element and exposes a callback that causes it to push this element.
Dedup also stores the last pulled element, and works like a filter: it doesn’t push the element if it is the same (referentially) as the stored element.
The whole graph is then as follows:

source -> scan -> repeatLast -> ( broadcastHub  ) -> dedup -> consumer

This is a kludge because I have to introduce superficious elements and then have to remove them. Please tell me if there’s a better solution.


(Igor Baltiyskiy) #4

Here’s how it looks like (it’s Kotlin, uses Java API plus a few of our own extension methods):

    val (repeat, bhub) = Source.range(1, 10)
      .throttle(10, 1.seconds.java)
      // Prevent completion after 10 elements
      .concat(Source.maybe())
      .repeatLast()
      .log(log, "source")
      .toMat(broadcastHub(16), Keep.both())
      .run(materializer)
    // Important that this works even after the elements stop coming
    val deadline = 2.seconds.deadline
    var i = 0
    val futures = generateSequence {
      if (!deadline.isOverdue) {
        val id = "consumer-${i++}"
        Thread.sleep(100)
        bhub
          .dedup()
          .prefixAndTail(1)
          .flatMapConcat { (e, tail) ->
            Source.range(1, e.first()).concat(tail)
          }
          .log(log, id)
          .take(10)
          .toMat(Sink.seq(), Keep.right())
          .mapMaterializedValue { m ->
            // Use this as a materialization hook
            repeat.requestElement()
            m
          }
          .run(materializer)
          .toCompletableFuture() to id
      } else null
    }.toList()
    futures.forEach { (f, id) ->
      assertThat(id, f, inFuture(contains(*((1..10).toList().toTypedArray()))))
    }

(Igor Baltiyskiy) #5

Note that the above solution doesn’t work for a simple reason that there’s a race between materialization of the consumer stream and BroadcastHub registering the consumer. Even if I didn’t use mapMaterializedValue and used Source.lazily() instead, for instance, that wouldn’t help me, because starting the stream (and any stage therein) is also not synchronized with BroadcastHub registering the consumer. So the requested element can be published by BroadcastHub before the consumer gets registered there.