Akka Streams - Kafka offset reset after partition reassignment

Hi all.

This is an interesting issue my team and I have been facing, and would be happy to get your input on.
We are running Akka streams over kafka, utilizing MergePrioritized to merge streams from several topics.

Here is the behaviour we are witnessing:

Consumer A starts up, and starts consuming.
Consumer A finishes processing up to offset 1000 for partition X (commiting)
Consumer B starts up, and a reassignment occurs.
Consumer B is assigned partition X
Consumer B starts processing from offset 920 for partition X

So we end up reprocessing ~50-100 messages per partition, every time a reassignment occurs.

I assume this can’t be completely avoided, but I was wondering if there is something we are doing wrong, or a way in which we can minimize this reprocessing behaviour.

He is what our code looks like:

  private val consumerSettings = ConsumerSettings(actorSystem,
    new ByteArrayDeserializer, new StringDeserializer)
    .withBootstrapServers(kafka_bootstrap_servers)
    .withGroupId("job_priority_consumer")
    .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
    .withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
    .withMaxWakeups(24 * 60)
    .withWakeupTimeout(60 seconds)

  private val stream1 = Consumer.committableSource(consumerSettings, Subscriptions.topics(topic_1)).buffer(2000, OverflowStrategy.backpressure)
  private val stream2 = Consumer.committableSource(consumerSettings, Subscriptions.topics(topic_2)).buffer(1000, OverflowStrategy.backpressure)
  private val stream3 = Consumer.committableSource(consumerSettings, Subscriptions.topics(topic_3)).buffer(5000, OverflowStrategy.backpressure)
  private val stream4 = Consumer.committableSource(consumerSettings, Subscriptions.topics(topic_4)).buffer(2000, OverflowStrategy.backpressure)

  private val PRIORITY_1 = 2
  private val PRIORITY_2 = 1
  private val PRIORITY_3 = 5
  private val PRIORITY_4 = 2

  val jobsProcessing: NotUsed = Source.combine(
    stream1,
    stream2,
    stream3,
    stream4
  )(
    _ =>
      MergePrioritized(Seq(PRIORITY_1, PRIORITY_2, PRIORITY_3, PRIORITY_4))
  ).mapAsync(100) {
    msg =>
      handleMessage(msg.record).recover(PartialFunction(handleError)).map(_ -> msg)
  }.map {
    msg =>
      //commit message
      msg._2.committableOffset.commitScaladsl()
  }.withAttributes(supervisionStrategy(stoppingDecider)).to(Sink.ignore).run()

  private def handleError(t: Throwable) = {
    t match {
      case NonFatal(e) =>
        Logger.warn(e.getLocalizedMessage)
        Future.successful()
    }
  }

Please tell me if there’s any additional info that is missing here

Hi Josef,

It is likely committing is the culprit here.

Committing offsets to Kafka one-by-one is very slow, you should use batched committs (which will just commit the latest offset for each partition).
If you are on the latest Alpakka Kafka version 1.0-M1, you may use the new Committer flow for convenience. If not, use groupedWithin as shown in the 4th snippet in this docs section https://doc.akka.io/docs/akka-stream-kafka/current/consumer.html#offset-storage-in-kafka-committing

I hope this helps you to lower the number for re-deliveries.

Cheers,
Enno.