Unexpected Control.drainAndShutdown behavior

My program (that’s Kotlin, but it should be pretty self-explanatory here):

val consumerSettings = ConsumerSettings.create(config, StringDeserializer(), StringDeserializer())
  .withBootstrapServers("kafka-0.kafka:9092")
  .withGroupId("test-group")
  .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
  .withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
  .withClientId("test-consumer-" + (system.lookupRoot().path().address()))

val committerSink: Graph<SinkShape<ConsumerMessage.CommittableOffset>, CompletionStage<Done>>
  = Committer.sink<ConsumerMessage.CommittableOffset>(CommitterSettings.create(system.settings().config().getConfig("akka.kafka.committer")))

@Volatile var lastConsumerControl: Consumer.Control? = null
val consumer: CompletionStage<Done> =
  RestartSource.onFailuresWithBackoff(
    Duration.ofSeconds(3),
    Duration.ofSeconds(30),
    0.2,
    20) {
      Consumer.committableSource(consumerSettings, Subscriptions.topics("test-topic"))
        .mapAsync(4) { msg ->
          log.info("Processing $msg")
          // Do some processing and return a CompletionStage
        }
        .mapMaterializedValue {
          lastConsumerControl = it
        }
    }
    .log("consumer-stream")
    .addAttributes(
      Attributes.createLogLevels(
        Attributes.logLevelOff(), // onElement
        Attributes.logLevelError(), // onFailure
        Attributes.logLevelWarning())) // onFinish
    .toMat(committerSink, Keep.right())
    .run(ActorMaterializer.create(system))
    .thenApply {
      log.warn("Consumer future is done!")
      it
    }

suspend fun main() {
  log.info("Started")
  CoordinatedShutdown.get(system).addTask(CoordinatedShutdown.PhaseBeforeActorSystemTerminate(), "stop-kafka-consumer", Supplier {
    log.info("triggered termination")
    val completionStage: CompletionStage<Done> = (lastConsumerControl?.drainAndShutdown(consumer, Runnable::run)
      ?: CompletableFuture.completedFuture(done()))
      .thenApply { log.warn("drainAndShutdown future is done!"); it }
    completionStage
  })
  consumer.await()
  log.info("Exiting")
}

Scenario: send SIGTERM to the process.
Expected behavior: the process terminates immediately.
Observed behavior: the process terminates after before-actor-system-terminate phase times out.

Output:

2019-08-20 17:58:17,107 INFO  [TestConsumer] [tcgd-akka.actor.default-dispatcher-3] [NO_DOMAIN] [none] [] - triggered termination
[ERROR] [08/20/2019 17:58:17.115] [tcgd-akka.actor.default-dispatcher-2] [akka.stream.Log(akka://tcgd/system/StreamSupervisor-0)] [consumer-stream] Upstream finished.
2019-08-20 17:58:17,133 WARN  [TestConsumer] [Thread-2] [NO_DOMAIN] [none] [] - Consumer future is done!
2019-08-20 17:58:17,138 INFO  [TestConsumer] [Thread-2] [NO_DOMAIN] [none] [] - Exiting
[WARN] [08/20/2019 17:58:30.161] [tcgd-akka.actor.default-dispatcher-2] [CoordinatedShutdown(akka://tcgd)] Coordinated shutdown phase [before-actor-system-terminate] timed out after 13000 milliseconds
2019-08-20 17:58:30,202 INFO  [o.a.k.c.c.i.AbstractCoordinator] [tcgd-akka.kafka.default-dispatcher-6] [NO_DOMAIN] [none] [] - [Consumer clientId=test-consumer-akka://tcgd, groupId=test-group] Sending LeaveGroup request to coordinator kafka-0.kafka.default.svc.cluster.local:9092 (id: 2147483647 rack: null)
2019-08-20 17:58:30,203 WARN  [TestConsumer] [Thread-3] [NO_DOMAIN] [none] [] - drainAndShutdown future is done!

Note: I’ve changed the timeout for that shutdown phase to 13s to be sure that drainAndShutdown waits until its end.

Why does drainAndShutdown future resolve only after the actor system is terminated? I’d expect it to shut down immediately.

1 Like

Hi Igor,

Are you aware that there is the stop-timeout setting which keeps the Alpakka Kafka consumer alive after requesting its shutdown?
See https://doc.akka.io/docs/alpakka-kafka/current/consumer.html#draining-control

Cheers,
Enno.

1 Like

You’re right, I somehow overlooked the fact that stop-timeout keeps the consumer alive. Thanks, that helped!