Problem: there’s a source that contains events that appear there as they happen in the system (through Source.queue()), then it runs through scan() to be consumed by an unknown number of subscribers that come at arbitrary points in time. For each subscriber, I want to give it a summary of what it has missed (by re-creating the missed events from the collected statistics) and then to keep feeding it new events as they arrive.
My first move was to just place a BroadcastHub after the scan(), but it obviously doesn’t work, because an existing subscriber can drain it, and then a new subscriber doesn’t get anything until an event happens. I need it to get the events right away.
Is there a way to do it using Akka Streams, short of writing my own BroadcastHub?
For now, I’m trying to solve this using two custom GraphStages — RepeatLast and Dedup.
RepeatLast stores last pulled element and exposes a callback that causes it to push this element.
Dedup also stores the last pulled element, and works like a filter: it doesn’t push the element if it is the same (referentially) as the stored element.
The whole graph is then as follows:
Here’s how it looks like (it’s Kotlin, uses Java API plus a few of our own extension methods):
val (repeat, bhub) = Source.range(1, 10)
.throttle(10, 1.seconds.java)
// Prevent completion after 10 elements
.concat(Source.maybe())
.repeatLast()
.log(log, "source")
.toMat(broadcastHub(16), Keep.both())
.run(materializer)
// Important that this works even after the elements stop coming
val deadline = 2.seconds.deadline
var i = 0
val futures = generateSequence {
if (!deadline.isOverdue) {
val id = "consumer-${i++}"
Thread.sleep(100)
bhub
.dedup()
.prefixAndTail(1)
.flatMapConcat { (e, tail) ->
Source.range(1, e.first()).concat(tail)
}
.log(log, id)
.take(10)
.toMat(Sink.seq(), Keep.right())
.mapMaterializedValue { m ->
// Use this as a materialization hook
repeat.requestElement()
m
}
.run(materializer)
.toCompletableFuture() to id
} else null
}.toList()
futures.forEach { (f, id) ->
assertThat(id, f, inFuture(contains(*((1..10).toList().toTypedArray()))))
}
Note that the above solution doesn’t work for a simple reason that there’s a race between materialization of the consumer stream and BroadcastHub registering the consumer. Even if I didn’t use mapMaterializedValue and used Source.lazily() instead, for instance, that wouldn’t help me, because starting the stream (and any stage therein) is also not synchronized with BroadcastHub registering the consumer. So the requested element can be published by BroadcastHub before the consumer gets registered there.