Problem with BroadcastHub.sink dropping items

#1

Hi all,

In the example code below, printing out the byteStrings source works as expected, but when I use BroadcastHub.sink and try the same, I get no output, at least for small streams. Larger streams seem to work as expected. Is there some type of buffering going on that could cause that?

      val byteStrings: Source[ByteString, Any]) = ...

      // Prints out as expected (but would drain the stream)
//    byteStrings.runForeach(bs => log.info(s"size = ${bs.size}"))

    val producer = byteStrings.toMat(BroadcastHub.sink(bufferSize = 256))(Keep.right).run()

    // Sometimes prints out nothing...
    producer.runForeach(bs => log.info(s"size = ${bs.size}"))

Thanks,
Allan

#2

The problem seems to be the buffer size. If it is larger than the number of items in the source, nothing is printed out. The doc says “If this buffer is full, the producer is backpressured”. So does that mean that if the buffer is not full, nothing happens?

In this example, nothing gets printed out unless you make the buffer size less than the number of items in the source:

import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{BroadcastHub, Keep, Sink, Source}

object TestProducer extends App {
  implicit val system = ActorSystem()
  implicit val mat = ActorMaterializer()
  val source = Source(1 to 10)
  val producer = source.toMat(BroadcastHub.sink(16))(Keep.right).run()
  producer.runForeach(x => println(s"1 Producer x: $x"))
}
(Patrik Nordwall) #3

You have one main stream:

source.toMat(BroadcastHub.sink(16))(Keep.right).run()

to which you can attach other streams; producer.runForeach. What happens here is that the main stream is already completed when you attach the other stream. Sometimes it might be quick enough to see a few elements before completion.

#4

OK thanks. I rewrote the code to use a RunnableGraph with Broadcast and Merge, as described in https://doc.akka.io/docs/akka/2.5.12/stream/stream-graphs.html?language=scala and that solved the problems.
It was not immediately obvious at first that when using BroadcastHub the stream was being emptied before I could read from it.

(Michael Wellner) #5

I just had a similar issue (see https://stackoverflow.com/questions/55533155/why-is-a-publisher-created-with-sink-aspublisher-not-working-when-it-is-consumed).

I solved the issue by using mapMaterializedValue to attach the first consumer to the source. Maybe this should also be somehow stated in the Docs? It actually spent some time to figure that out …