I’m playing around with the MergeHub and BroadcastHub in Akka Stream but I am a bit confused by the process of constructing the stream.
Consider the example for BroadcastHub from the documentation:
val producer : Source[String, Cancellable] = Source.tick(1.second, 1.second, "New message") var sink : Sink[String, Source[String, NotUsed]] = BroadcastHub.sink(bufferSize = 256) val runnableGraph: RunnableGraph[Source[String, NotUsed]] = producer.toMat(sink)(Keep.right) // By running/materializing the producer, we get back a Source, which // gives us access to the elements published by the producer. val fromProducer: Source[String, NotUsed] = runnableGraph.run() // Print out messages from the producer in two independent consumers fromProducer.runForeach(msg => println("consumer1: " + msg)) fromProducer.runForeach(msg => println("consumer2: " + msg))
The first line creates a
Source for the entire stream, which produces
Conceptually speaking, attaching a
BroadcastHub as a
Sink to this stream means you can extend it at runtime. So while the stream is running you can add additional
Sinks to it. Makes perfect sense.
However, the documentation in the beginning also states the following:
It is important to remember that even after constructing the
RunnableGraphby connecting all the source, sink and different operators, no data will flow through it until it is materialized.
After running (materializing) the
RunnableGraph[T]we get back the materialized value of type T. Every stream operator can produce a materialized value, and it is the responsibility of the user to combine them to a new type. In the above example, we used
toMatto indicate that we want to transform the materialized value of the source and sink, and we used the convenience function
Keep.rightto say that we are only interested in the materialized value of the sink.
So if we look back at the example code indeed, we only care about the result of the
Sink, which in this case is indeed
But what I do not understand, is, conceptually speaking, the code shown produces a stream to which an arbitrary amount of streams can be connected (in the above example 2).
The semantics I’m guessing are that for each
String emitted by
producer is cached in the
BroadcastHub. Everytime you connect to the
fromProducer a new source is created which emits all the cached values from before?
But then still, shouldn’t the stream
fromProducer emit a new
Source everytime the
Any clarification is greatly appreciated,