How to handle kafka RetriableCommitFailedException when using committableSource?

We are using alpakka to consumer the Kafka messages, but we offen meet the RetriableCommitFailedException which make the consumer stops

Sample code:

Consumer.committableSource(consumerSettings, Subscriptions.topics(topic))
      .mapAsync(1) { msg =>

       handle(msg)
      }
      .batch(max = 20, first => CommittableOffsetBatch.empty.updated(first)) { (batch, elem) =>
        batch.updated(elem)
      }
      .mapAsync(3)(commitBatch(_))
      .runWith(Sink.ignore)

Excepiton:

org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets.

I know I should re-commit the offsets, but I am new for akka streams, I dont know how to do it.

1 Like

I am also facing same exception. Any suggestion(s)?

2020-09-20 21:54:23.583 [kafka-coordinator-heartbeat-thread | ceon-current-local-read] ERROR o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=ceon-current-local-read] Offset commit failed on partition ceon-cds-sync-event0-0 at offset 0: This is not the correct coordinator.
2020-09-20 21:54:24.172 [ceon-current-data-akka.actor.default-dispatcher-20] WARN  a.kafka.internal.KafkaConsumerActor - Kafka commit took longer than `commit-time-warning`: 3039 ms
2020-09-20 21:54:24.188 [ceon-current-data-akka.actor.default-dispatcher-21] ERROR o.d.c.s.current.actors.KafkaConsumer - Exception in stream processing
org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets.
Caused by: org.apache.kafka.common.errors.DisconnectException: null

Hi @bvinayakumar. Alpakka Kafka recently added support to retry retriable commit failures. It is available in Alpakka Kafka 2.0.2 and greater. Are you using a recent version?