Reactive-kafka and BroadcastHubs

Hi,

I’m trying to use reactive-kafka with websockets to allow for essentially a publish subscribe proxy to kafka (clients can add and remove subscriptions to particular topics and possibly filters within them).

My play app will constantly consume from a Kafka topic on a given partition. When new clients make a socket.io connection with the server, they will simply get an up to date real time stream of that topic.

The issue that I’m having is that it seems my clients are fighting for kafka messages even when using a BroadcastHub. Essentially, I have logs that look like this:

[info] m.SocketIOEngineProvider - Sink ignore 111694
[info] m.SocketIOEngineProvider - Sink ignore 111695
[info] m.SocketIOEngineProvider - Sink ignore 111696
[info] m.SocketIOEngineProvider - Sink ignore 111697
[info] m.SocketIOEngineProvider - Sink ignore 111698
[info] m.SocketIOEngineProvider - Sink ignore 111699
[info] m.SocketIOEngineProvider - d392a846-46f1-4409-90ac-841e7e723576 111700
[warn] a.k.KafkaConsumerActor - RequestMessages from topic/partition Set(topic-0) already requested by other stage Set(topic-0)
[warn] a.k.KafkaConsumerActor - RequestMessages from topic/partition Set(topic-0) already requested by other stage Set(topic-0)
[info] m.SocketIOEngineProvider - Sink ignore 111701
[warn] a.k.KafkaConsumerActor - RequestMessages from topic/partition Set(topic-0) already requested by other stage Set(topic-0)
[warn] a.k.KafkaConsumerActor - RequestMessages from topic/partition Set(topic-0) already requested by other stage Set(topic-0)
[warn] a.k.KafkaConsumerActor - RequestMessages from topic/partition Set(topic-0) already requested by other stage Set(topic-0)
[info] m.SocketIOEngineProvider - Sink ignore 111702
[info] m.SocketIOEngineProvider - d392a846-46f1-4409-90ac-841e7e723576 111703
[info] m.SocketIOEngineProvider - Sink ignore 111704
[info] m.SocketIOEngineProvider - Sink ignore 111705
[info] m.SocketIOEngineProvider - Sink ignore 111706
[info] m.SocketIOEngineProvider - Sink ignore 111707
[info] m.SocketIOEngineProvider - Sink ignore 111708
[info] m.SocketIOEngineProvider - Sink ignore 111709
[info] m.SocketIOEngineProvider - d392a846-46f1-4409-90ac-841e7e723576 111710
[info] m.SocketIOEngineProvider - Sink ignore 111711
[warn] a.k.KafkaConsumerActor - RequestMessages from topic/partition Set(topic-0) already requested by other stage Set(topic-0)
[warn] a.k.KafkaConsumerActor - RequestMessages from topic/partition Set(topic-0) already requested by other stage Set(topic-0)
[info] m.SocketIOEngineProvider - d392a846-46f1-4409-90ac-841e7e723576 111712
[warn] a.k.KafkaConsumerActor - RequestMessages from topic/partition Set(topic-0) already requested by other stage Set(topic-0)
[warn] a.k.KafkaConsumerActor - RequestMessages from topic/partition Set(topic-0) already requested by other stage Set(topic-0)
[warn] a.k.KafkaConsumerActor - RequestMessages from topic/partition Set(topic-0) already requested by other stage Set(topic-0)
[warn] a.k.KafkaConsumerActor - RequestMessages from topic/partition Set(topic-0) already requested by other stage Set(topic-0)
[warn] a.k.KafkaConsumerActor - RequestMessages from topic/partition Set(topic-0) already requested by other stage Set(topic-0)
[warn] a.k.KafkaConsumerActor - RequestMessages from topic/partition Set(topic-0) already requested by other stage Set(topic-0)
[warn] a.k.KafkaConsumerActor - RequestMessages from topic/partition Set(topic-0) already requested by other stage Set(topic-0)

As you can see, it’s split up between the consumers. What I am doing that results in this behaviour is the following. Globally I create a BroadcastHub like so:

  private val producer
    : Source[ConsumerRecord[String, String], Consumer.Control] =
    Consumer
      .plainExternalSource[String, String](
        kafkaConsumerActor,
        Subscriptions.assignment(new TopicPartition("topic", 0)))

  private val hub
    : RunnableGraph[Source[ConsumerRecord[String, String], NotUsed]] =
    rawOhlcvProducer.toMat(BroadcastHub.sink)(Keep.right)

And then before anyone connects (to ensure the producer is always drained)

hub.run().runWith(Sink.ignore)

And then every time a client connects:


        val graph = hub
          .run()
          .map(record => Json.parse(record.value()).as[ExchangeCandleData]).to(userSink)

        // Start it up!
        val killSwitch = graph.run()

I assumed the BroadcastHub would fix this issue, but it seems I need something else?

Thanks for the assistance/advice in advance. I’ve been stuck on this issue for quite a while :(

I think what you are doing wrong is likely that you are running a new hub on every connection, you should run the hub once and catch the materialized source and then run that once with Sink.ignore and then once for every connection.

Just for reference, this question was originally asked in https://github.com/akka/reactive-kafka/issues/486

Yes indeed that was the issue. I was making a new hub every time. Thank you for the help,