Kafka consumer actor stop after Serialization exception


(Politrons) #1

I´m using Kakfa-client with Akka stream, and even having a Supervisor Strategy defined in my materializer

    materializer = ActorMaterializer(createActorMaterializerSettings(strategyDecider, inputBuffer, system))
  private def createActorMaterializerSettings(strategyDecider: function.Function[Throwable, Supervision.Directive], inputBuffer: Int, system: ActorSystem) = {
    ActorMaterializerSettings(system)
      .withDispatcher("akka.stream-sink-dispatcher")
      .withSupervisionStrategy(toScalaFunction(strategyDecider))
      .withInputBuffer(initialSize = 1, maxSize = inputBuffer)
  }

When I receive something that I cannot deserialize

akka stream die org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition

My stream dont use the supervisor strategy and die.

Consumer.plainSource(buildConsumerSettings(consumerConfig),Subscriptions.topics(topicName))

As a workaround how about back off strategy. https://doc.akka.io/docs/akka-stream-kafka/current/errorhandling.html#restarting-the-stream-with-a-backoff-stage

or create the KafkaConusumerActor by myself and the subscribe in the pipeline to the topic as they do here https://github.com/kciesielski/reactive-kafka/blob/7dd29719a5d1c6eb70584e67bf9f4dbd7b6d2b39/docs/src/test/scala/sample/scaladsl/ConsumerExample.scala#L328
Then maybe the supervisor decision works.


Handling SerializationException in an Alpakka Kafka consumer