Increasing consumers in consumer group cause rebalance failure with CommitFailedException because of revoked partition

I think this issue is related to #539 but I don’t know if it is a bug, or the user is supposed to handle it himself.

So I have a consumer group, whenever i increase the number of consumer in that group, the revoking of partition is causing the following error:

org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:778)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.doCommitOffsetsAsync(ConsumerCoordinator.java:617)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsAsync(ConsumerCoordinator.java:584)
	at org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1479)
	at akka.kafka.internal.KafkaConsumerActor.akka$kafka$internal$KafkaConsumerActor$$commit(KafkaConsumerActor.scala:430)
	at akka.kafka.internal.KafkaConsumerActor$$anonfun$receive$1.applyOrElse(KafkaConsumerActor.scala:210)
	at akka.actor.Actor.aroundReceive(Actor.scala:517)
	at akka.actor.Actor.aroundReceive$(Actor.scala:515)
	at akka.kafka.internal.KafkaConsumerActor.akka$actor$Timers$$super$aroundReceive(KafkaConsumerActor.scala:142)
	at akka.actor.Timers.aroundReceive(Timers.scala:55)
	at akka.actor.Timers.aroundReceive$(Timers.scala:40)
	at akka.kafka.internal.KafkaConsumerActor.aroundReceive(KafkaConsumerActor.scala:142)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
	at akka.actor.ActorCell.invoke(ActorCell.scala:561)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
	at akka.dispatch.Mailbox.run(Mailbox.scala:225)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

This does not happen when i scale down the number of consumer. I mean so far i have not observed that. I assume this is because, partition are not revoke on scaling down. The remaining consumer just get new partition.

Note that, I do group message and commit batch things.

Here is how my code look like

val source = Consumer.committableSource(consumerSettings, subscription)
      .async
      .groupBy(Int.MaxValue, computeNamedGraph)
      .groupedWithin(conf.tripleStoreSettings.batchSize, conf.tripleStoreSettings.batchWindowSec seconds)
      .map(toUpdateStatements)
      .async
      .mergeSubstreams
      .map(toHttpRequest)
      .map(p => p.data -> p)
      .via(poolClientFlow)
      .async
      .map { case (respone, payload) => Payload(respone, payload.offsets) }
      .mapConcat(handleResponse)
      .via(Committer.flow(committerDefaults.withMaxBatch(conf.tripleStoreSettings.batchSize)))

    val (killSwitch, streamResults) = source
      .viaMat(KillSwitches.single)(Keep.right)
      .toMat(Sink.ignore)(Keep.both)
      .run()

    streamResults.onComplete {
      case Success(_) =>
        logger.info("Stream finished")
        system.terminate()
      case Failure(e) =>
        logger.error("Stream failed:", e)
        system.terminate()
    }

My decider just does the following:

 private val decider: Supervision.Decider = {
    e => {
      logger.error(s"Stream failed. ${e.getMessage} ${e.getStackTrace.map(_.toString).mkString("\n")}", e)
      Supervision.Stop
    }
  }

So I understand based on my reading of #539 that i have a number of inflight messages to commit back and I can’t because of the revokation. That is, there is some rebalance that involve revokation that happen when the number of consumer is scaled up.

My service is at-least once, so i don’t mind if another consumer reprocess those message. we don’t have an at-most-one deliver constraint.

My question would be until the library handle those situation natively, how can i go about committing them anyway whenever revoke occurs or better yet, just discard them, so the consumer who get assigned the partition they belong too, will reprocess them.

Any suggestion ? I check the BalanceListener but i am not sure how to go about using it for this situation.

Note My timeout configs

val subscription = Subscriptions.topicPattern(conf.kafkaConsumer.sourceTopic)
    val consumerSettings = ConsumerSettings(system, new StringDeserializer, new StringDeserializer)
      .withBootstrapServers(conf.kafkaBroker.bootstrapServers)
      .withGroupId(conf.kafkaConsumer.groupId)
      .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, conf.kafkaConsumer.offsetReset)
      .withProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "5000000")
      .withProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100000")

Hi Maatary,

Please don’t cross post the same question in multiple forums.

If you want to pull attention to an issue, reference it from here instead.

Cheers,
Enno.

PS: This misbehaviour is discussed in https://github.com/akka/alpakka-kafka/issues/750