Hello,
I have a stream which consumes from multiple sources (each of them is a Consumer.committableSource reading a Kafka topic) and performs some processing on the input messages before merging them. The last step in the merged stream is committing the offsets, but I get the following error:
java.lang.IllegalArgumentException: requirement failed: CommittableOffset [CommittableOffsetImpl(PartitionOffset(GroupTopicPartition(some_topic,0),0))] origin stage must be same as other stage with same groupId. Expected [KafkaAsyncConsumerCommitterRef(Actor[akka://PrdSys/system/kafka-consumer-3#1201446100],15000 milliseconds)], got [KafkaAsyncConsumerCommitterRef(Actor[akka://PrdSys/system/kafka-consumer-2#-1659888063],15000 milliseconds)]
at scala.Predef$.require(Predef.scala:277)
at akka.kafka.internal.ConsumerStage$CommittableOffsetBatchImpl.updated(ConsumerStage.scala:252)
at com.x.y.z$.$anonfun$runStream$6...
at akka.stream.impl.fusing.Batch$$anon$23.onPush(Ops.scala:1002)
at akka.stream.impl.fusing.GraphInterpreter.processPush(GraphInterpreter.scala:519)
at akka.stream.impl.fusing.GraphInterpreter.processEvent(GraphInterpreter.scala:482)
at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:378)
at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:588)
at akka.stream.impl.fusing.ActorGraphInterpreter$SimpleBoundaryEvent.execute(ActorGraphInterpreter.scala:45)
at akka.stream.impl.fusing.ActorGraphInterpreter$SimpleBoundaryEvent.execute$(ActorGraphInterpreter.scala:41)
If I process a single source and commit the offset then it works as expected but when I add another source this error pops up.
Any idea about what could be happening?
Thanks in advance