How to handle kafka RetriableCommitFailedException when using committableSource?

scala

(Lawulu) #1

We are using alpakka to consumer the Kafka messages, but we offen meet the RetriableCommitFailedException which make the consumer stops

Sample code:

Consumer.committableSource(consumerSettings, Subscriptions.topics(topic))
      .mapAsync(1) { msg =>

       handle(msg)
      }
      .batch(max = 20, first => CommittableOffsetBatch.empty.updated(first)) { (batch, elem) =>
        batch.updated(elem)
      }
      .mapAsync(3)(commitBatch(_))
      .runWith(Sink.ignore)

Excepiton:

org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets.

I know I should re-commit the offsets, but I am new for akka streams, I dont know how to do it.