Alpakka kafka consumer gets destroyed automatically after 20s

(Bifunctor) #1

Hi all

I have a kafka consumer, that gets shutdown after 20s, when it does not receive any messages.
After then, it is no more possible to receive any messages, because the actor got destroyed.
How to prevent, that the consumer gets destroyed after particular time?

I am using the following consumer configuration, that is copied from alpakka website:

# Properties for akka.kafka.ConsumerSettings can be
# defined in this section or a configuration section with
# the same layout. 
akka.kafka.consumer {
  # Tuning property of scheduled polls.
  # Controls the interval from one scheduled poll to the next.
  poll-interval = 50ms
  
  # Tuning property of the `KafkaConsumer.poll` parameter.
  # Note that non-zero value means that the thread that
  # is executing the stage will be blocked. See also the `wakup-timeout` setting below.
  poll-timeout = 50ms
  
  # The stage will delay stopping the internal actor to allow processing of
  # messages already in the stream (required for successful committing).
  # Prefer use of `DrainingControl` over a large stop-timeout.
  stop-timeout = 30s
  
  # Duration to wait for `KafkaConsumer.close` to finish.
  close-timeout = 20s
  
  # If offset commit requests are not completed within this timeout
  # the returned Future is completed `CommitTimeoutException`.
  commit-timeout = 15s

  # If commits take longer than this time a warning is logged
  commit-time-warning = 1s

  # Not used anymore (since 1.0-RC1)
  # wakeup-timeout = 3s

  # Not used anymore (since 1.0-RC1)
  # max-wakeups = 10

  # If set to a finite duration, the consumer will re-send the last committed offsets periodically
  # for all assigned partitions. See https://issues.apache.org/jira/browse/KAFKA-4682.
  commit-refresh-interval = infinite

  # Not used anymore (since 1.0-RC1)
  # wakeup-debug = true
  
  # Fully qualified config path which holds the dispatcher configuration
  # to be used by the KafkaConsumerActor. Some blocking may occur.
  use-dispatcher = "akka.kafka.default-dispatcher"

  # Properties defined by org.apache.kafka.clients.consumer.ConsumerConfig
  # can be defined in this configuration section.
  kafka-clients {
    # Disable auto-commit by default
    enable.auto.commit = false
  }

  # Time to wait for pending requests when a partition is closed
  wait-close-partition = 500ms

  # Limits the query to Kafka for a topic's position
  position-timeout = 5s

  # When using `AssignmentOffsetsForTimes` subscriptions: timeout for the
  # call to Kafka's API
  offset-for-times-timeout = 5s

  # Timeout for akka.kafka.Metadata requests
  # This value is used instead of Kafka's default from `default.api.timeout.ms`
  # which is 1 minute.
  metadata-request-timeout = 5s
}

Thanks

(Enno) #2

It sounds more likely you have an operator in your stream that shuts down processing.

Enno.

1 Like
(Bifunctor) #3

Here is the code:

import akka.Done
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.StatusCodes
import akka.http.scaladsl.model.ws._
import akka.kafka.scaladsl.Consumer
import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import com.typesafe.scalalogging.Logger
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer

import scala.concurrent.{Future, Promise}

final case class WsGraph(logger: Logger, sink: Sink[Message, Future[Done]])(implicit val system: ActorSystem) {


  private implicit val materializer = ActorMaterializer()
  private implicit val akka = system.settings.config.getConfig("akka.kafka.consumer")
  private implicit val executor = system.dispatcher
  private val consumerSetup = system.settings.config.getConfig("kafka.consumer.setup")
  private val wsSetup = system.settings.config.getConfig("websocket.setup")

  private val consumerSettings: ConsumerSettings[String, String] =
    ConsumerSettings(akka, new StringDeserializer, new StringDeserializer)
      .withBootstrapServers(consumerSetup.getString("bootStrapServers"))
      .withGroupId(consumerSetup.getString("groupId"))
      .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")

  private val kafkaAsSource: Source[Message, (Consumer.Control, Promise[Option[Message]])] = Consumer
    .plainSource(
      consumerSettings,
      Subscriptions.topics(consumerSetup.getString("topics"))
    )
    .map(msg => TextMessage(msg.value()))
    .concatMat(Source.maybe[Message])(Keep.both)
    .mapAsync(Runtime.getRuntime.availableProcessors())(Future(_))

 
  private val socketFlow: Flow[Message, Message, (Consumer.Control, Promise[Option[Message]])] =
    Flow.fromSinkAndSourceMat(sink, kafkaAsSource)(Keep.right)


  private val (upgradeResponse, (draining, _)) =
    Http().singleWebSocketRequest(
      WebSocketRequest(wsSetup.getString("server")),
      socketFlow)

  val create: Future[Either[String, Done]] = upgradeResponse.map { upgrade =>
    // just like a regular http request we can access response status which is available via upgrade.response.status
    // status code 101 (Switching Protocols) indicates that server support WebSockets
    if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
      logger.info("Switching protocols")
      Right(Done)
    } else {
      Left(s"Connection failed: ${upgrade.response.status}")
    }
  }

  sys.addShutdownHook {
    draining.shutdown()
    logger.info("Draining websocket ressource.")
  }

}

I could not figure out why.
Would be nice, if you can help me.

Thanks

(Bifunctor) #4

It also said:

20:15:08.378 [SAP-SENDER-akka.actor.default-dispatcher-32] DEBUG akka.kafka.internal.KafkaConsumerActor - Received Stop from Actor[akka://SAP-SENDER/system/StreamSupervisor-0/$$a#-730090264], stopping
20:15:08.380 [SAP-SENDER-akka.actor.default-dispatcher-32] DEBUG akka.actor.TimerScheduler - Cancel all timers
20:15:08.380 [kafka-coordinator-heartbeat-thread | SAP-SENDER-GROUP)] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=SAP-SENDER-GROUP)] Heartbeat thread has closed
20:15:08.380 [SAP-SENDER-akka.kafka.default-dispatcher-42] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=SAP-SENDER-GROUP)] Sending LeaveGroup request to coordinator sweetsoft:9092 (id: 2147483647 rack: null)
20:15:08.382 [SAP-SENDER-akka.kafka.default-dispatcher-42] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=SAP-SENDER-GROUP)] LeaveGroup request returned successfully
20:15:08.383 [SAP-SENDER-akka.kafka.default-dispatcher-42] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name connections-closed:
20:15:08.383 [SAP-SENDER-akka.kafka.default-dispatcher-42] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name connections-created:
20:15:08.383 [SAP-SENDER-akka.kafka.default-dispatcher-42] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name successful-authentication:
20:15:08.383 [SAP-SENDER-akka.kafka.default-dispatcher-42] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name failed-authentication:
20:15:08.384 [SAP-SENDER-akka.kafka.default-dispatcher-42] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name bytes-sent-received:
20:15:08.384 [SAP-SENDER-akka.kafka.default-dispatcher-42] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name bytes-sent:
20:15:08.384 [SAP-SENDER-akka.kafka.default-dispatcher-42] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name bytes-received:
20:15:08.384 [SAP-SENDER-akka.kafka.default-dispatcher-42] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name select-time:
20:15:08.384 [SAP-SENDER-akka.kafka.default-dispatcher-42] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name io-time:
20:15:08.384 [SAP-SENDER-akka.kafka.default-dispatcher-42] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node--1.bytes-sent
20:15:08.384 [SAP-SENDER-akka.kafka.default-dispatcher-42] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node--1.bytes-received
20:15:08.384 [SAP-SENDER-akka.kafka.default-dispatcher-42] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node--1.latency
20:15:08.384 [SAP-SENDER-akka.kafka.default-dispatcher-42] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node-2147483647.bytes-sent
20:15:08.385 [SAP-SENDER-akka.kafka.default-dispatcher-42] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node-2147483647.bytes-received
20:15:08.385 [SAP-SENDER-akka.kafka.default-dispatcher-42] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node-2147483647.latency
20:15:08.385 [SAP-SENDER-akka.kafka.default-dispatcher-42] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node-0.bytes-sent
20:15:08.385 [SAP-SENDER-akka.kafka.default-dispatcher-42] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node-0.bytes-received
20:15:08.385 [SAP-SENDER-akka.kafka.default-dispatcher-42] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node-0.latency
20:15:08.385 [SAP-SENDER-akka.kafka.default-dispatcher-42] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-1, groupId=SAP-SENDER-GROUP)] Kafka consumer has been closed
20:15:08.387 [SAP-SENDER-akka.actor.default-dispatcher-32] INFO akka.actor.RepointableActorRef - Message [akka.kafka.KafkaConsumerActor$Stop$] from Actor[akka://SAP-SENDER/system/StreamSupervisor-0/$$a#-730090264] to Actor[akka://SAP-SENDER/system/kafka-consumer-1#363756220] was not delivered. [1] dead letters encountered. If this is not an expected behavior, then [Actor[akka://SAP-SENDER/system/kafka-consumer-1#363756220]] may have terminated unexpectedly, This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

What could be wrong?

(Bifunctor) #5

I figure out, when the webserver is no more active or was shutting down, then the kafka consumer is going to close.

How can I check, if the webserver is still active? If not, then the actor should do restart the websocket client and consumer.