Hi,
I’m trying to use reactive-kafka
with websockets to allow for essentially a publish subscribe proxy to kafka (clients can add and remove subscriptions to particular topics and possibly filters within them).
My play app will constantly consume from a Kafka topic on a given partition. When new clients make a socket.io connection with the server, they will simply get an up to date real time stream of that topic.
The issue that I’m having is that it seems my clients are fighting for kafka messages even when using a BroadcastHub
. Essentially, I have logs that look like this:
[info] m.SocketIOEngineProvider - Sink ignore 111694
[info] m.SocketIOEngineProvider - Sink ignore 111695
[info] m.SocketIOEngineProvider - Sink ignore 111696
[info] m.SocketIOEngineProvider - Sink ignore 111697
[info] m.SocketIOEngineProvider - Sink ignore 111698
[info] m.SocketIOEngineProvider - Sink ignore 111699
[info] m.SocketIOEngineProvider - d392a846-46f1-4409-90ac-841e7e723576 111700
[warn] a.k.KafkaConsumerActor - RequestMessages from topic/partition Set(topic-0) already requested by other stage Set(topic-0)
[warn] a.k.KafkaConsumerActor - RequestMessages from topic/partition Set(topic-0) already requested by other stage Set(topic-0)
[info] m.SocketIOEngineProvider - Sink ignore 111701
[warn] a.k.KafkaConsumerActor - RequestMessages from topic/partition Set(topic-0) already requested by other stage Set(topic-0)
[warn] a.k.KafkaConsumerActor - RequestMessages from topic/partition Set(topic-0) already requested by other stage Set(topic-0)
[warn] a.k.KafkaConsumerActor - RequestMessages from topic/partition Set(topic-0) already requested by other stage Set(topic-0)
[info] m.SocketIOEngineProvider - Sink ignore 111702
[info] m.SocketIOEngineProvider - d392a846-46f1-4409-90ac-841e7e723576 111703
[info] m.SocketIOEngineProvider - Sink ignore 111704
[info] m.SocketIOEngineProvider - Sink ignore 111705
[info] m.SocketIOEngineProvider - Sink ignore 111706
[info] m.SocketIOEngineProvider - Sink ignore 111707
[info] m.SocketIOEngineProvider - Sink ignore 111708
[info] m.SocketIOEngineProvider - Sink ignore 111709
[info] m.SocketIOEngineProvider - d392a846-46f1-4409-90ac-841e7e723576 111710
[info] m.SocketIOEngineProvider - Sink ignore 111711
[warn] a.k.KafkaConsumerActor - RequestMessages from topic/partition Set(topic-0) already requested by other stage Set(topic-0)
[warn] a.k.KafkaConsumerActor - RequestMessages from topic/partition Set(topic-0) already requested by other stage Set(topic-0)
[info] m.SocketIOEngineProvider - d392a846-46f1-4409-90ac-841e7e723576 111712
[warn] a.k.KafkaConsumerActor - RequestMessages from topic/partition Set(topic-0) already requested by other stage Set(topic-0)
[warn] a.k.KafkaConsumerActor - RequestMessages from topic/partition Set(topic-0) already requested by other stage Set(topic-0)
[warn] a.k.KafkaConsumerActor - RequestMessages from topic/partition Set(topic-0) already requested by other stage Set(topic-0)
[warn] a.k.KafkaConsumerActor - RequestMessages from topic/partition Set(topic-0) already requested by other stage Set(topic-0)
[warn] a.k.KafkaConsumerActor - RequestMessages from topic/partition Set(topic-0) already requested by other stage Set(topic-0)
[warn] a.k.KafkaConsumerActor - RequestMessages from topic/partition Set(topic-0) already requested by other stage Set(topic-0)
[warn] a.k.KafkaConsumerActor - RequestMessages from topic/partition Set(topic-0) already requested by other stage Set(topic-0)
As you can see, it’s split up between the consumers. What I am doing that results in this behaviour is the following. Globally I create a BroadcastHub
like so:
private val producer
: Source[ConsumerRecord[String, String], Consumer.Control] =
Consumer
.plainExternalSource[String, String](
kafkaConsumerActor,
Subscriptions.assignment(new TopicPartition("topic", 0)))
private val hub
: RunnableGraph[Source[ConsumerRecord[String, String], NotUsed]] =
rawOhlcvProducer.toMat(BroadcastHub.sink)(Keep.right)
And then before anyone connects (to ensure the producer is always drained)
hub.run().runWith(Sink.ignore)
And then every time a client connects:
val graph = hub
.run()
.map(record => Json.parse(record.value()).as[ExchangeCandleData]).to(userSink)
// Start it up!
val killSwitch = graph.run()
I assumed the BroadcastHub would fix this issue, but it seems I need something else?
Thanks for the assistance/advice in advance. I’ve been stuck on this issue for quite a while :(