Detect a stopped consumer when using actorRefWithAck

Hello.

I’m having some issues with one of my Kafka Consuming actors. The actors job is to consume from multiple Kafka topics, and to store the contents of each message into the database based on what topic it is coming from.

Using Play Framework 2.6 and akka-stream-kafka 0.22

Its set up as such:

    val consumerSettings = ConsumerSettings(config.underlying, deserializer, deserializer)
      .withBootstrapServers(kafkaUrl)
      .withGroupId(groupId)
      .withClientId(clientId)
      .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

    val sink = Sink.actorRefWithAck(
        system.actorOf(StoringActor.props),
        onInitMessage     = Init,
        ackMessage        = Ack,
        onCompleteMessage = Done,
        onFailureMessage  = (ex: Throwable) => StreamFailure(ex))

    Consumer.plainSource(consumerSettings, Constants.SUBSCRIPTION_ALL_RELEVANT)
      .map(rec => KafkaMessage(rec.topic(), rec.value()))
      .runWith(sink)(materializer)

Where the StoringActor responds with an Ack after every message is handled.

Storing is done as such:

class StoringActor extends Actor {
  private def store(topic: String, value: String, parent: ActorRef): Unit = {
    Logger.debug(s"${LocalDateTime.now().toString} - Kafka package on topic: $topic")

    topic match {
      case myTopic1 =>
        Json.parse(value).asOpt[SomeObject] match {
          case Some(someObject) =>
            Database
              .store(someObject)
              .map(_ => Ack)
              .pipeTo(parent)

          case None => 
              Logger.error(s"Cannot parse in Topic: $topic. Value: $value")
              parent ! Ack
        }

    case _ => parent ! Ack
  }

  def receive = {
    case KafkaMessage(topic, value) => store(topic, value, sender)

    case Init  =>
      Logger.info("StorageActors Stream Initialized")
      sender ! Ack

    case StreamFailure(ex) =>
      Logger.error("StorageActors stream failed.", ex)

    case Done =>
      Logger.error("Stream was Completed.")

    case _ =>
      Logger.error("Unexpected message received in StorageActor ")
      sender ! Ack
  }
}

My Issue
After a while (can be up to a week) this actor suddenly stops consuming and does not throw any apparent errors. Just as if one package does not respond with an Ack, and the system gets stuck in a position of waiting for a reply.

I have been looking at introducing an actor supervisor, but in my case I can’t react to an error to be thrown.
Is there any methods to check if a running actor is responsive? And to restart it if it stops responding? I imagine I need some sort of a ping system from a supervisor to check the health of the actor every n minutes.

Any help or tips is greatly appreciated.
Thanks!

You may watch an actor from another actor: https://doc.akka.io/docs/akka/current/actors.html#lifecycle-monitoring-aka-deathwatch

I hope that helps,
Enno.

Thank you for the reply @ennru

I moved the materialization of the Source into the StoringActor, added context.setReceiveTimeout(NO_PACKAGE_RECEIVED_TIMEOUT), with receive case

case ReceiveTimeout =>
  Logger.error("StoringActor received timeout. Stopping stream and actor")
  consumerController
    .shutdown()
    .onComplete(
      _ => context.stop(self)
    )

and spawned the StoringActor using a supervisor with Backoff.onStop().

When the consumer for whatever reason stops getting packages (or wont Ack), it will stop the stream and itself, and the supervisor will start a new StorageActor.

I have not been able to confirm if it actually works in prod yet, but it does what I needed :)