IllegalArgumentException committing offsets: origin stage must be same as other stage with same groupId

Hello,

I have a stream which consumes from multiple sources (each of them is a Consumer.committableSource reading a Kafka topic) and performs some processing on the input messages before merging them. The last step in the merged stream is committing the offsets, but I get the following error:

java.lang.IllegalArgumentException: requirement failed: CommittableOffset [CommittableOffsetImpl(PartitionOffset(GroupTopicPartition(some_topic,0),0))] origin stage must be same as other stage with same groupId. Expected [KafkaAsyncConsumerCommitterRef(Actor[akka://PrdSys/system/kafka-consumer-3#1201446100],15000 milliseconds)], got [KafkaAsyncConsumerCommitterRef(Actor[akka://PrdSys/system/kafka-consumer-2#-1659888063],15000 milliseconds)]
	at scala.Predef$.require(Predef.scala:277)
	at akka.kafka.internal.ConsumerStage$CommittableOffsetBatchImpl.updated(ConsumerStage.scala:252)
	at com.x.y.z$.$anonfun$runStream$6...
	at akka.stream.impl.fusing.Batch$$anon$23.onPush(Ops.scala:1002)
	at akka.stream.impl.fusing.GraphInterpreter.processPush(GraphInterpreter.scala:519)
	at akka.stream.impl.fusing.GraphInterpreter.processEvent(GraphInterpreter.scala:482)
	at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:378)
	at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:588)
	at akka.stream.impl.fusing.ActorGraphInterpreter$SimpleBoundaryEvent.execute(ActorGraphInterpreter.scala:45)
	at akka.stream.impl.fusing.ActorGraphInterpreter$SimpleBoundaryEvent.execute$(ActorGraphInterpreter.scala:41)

If I process a single source and commit the offset then it works as expected but when I add another source this error pops up.

Any idea about what could be happening?

Thanks in advance

Hi jdawnmal,

You may not blend commit handles from different Alpakka Kafka consumers which do not share a single KafkaConsumerActor internally.

You need to keep them apart for committing as it goes back via the original underlying actor.

I hope that helps.

Cheers,
Enno.

Thanks Ennru,

You are right, checking the Alpakka Kafka code I understand why it was happening. The quick fix was to split the stream to manage offsets separately. I wonder if there is a more elegant solution. Still looking for that. Ideas?

Thanks