It looks like I’m running into a variation on the problem reported earlier this month in the post Kafka consumer actor stop after Serialization exception. I’m using version 0.22 of akka-stream-kafka_2.12 (the Alpakka Kafka connector).
Out of normal diligence, I wanted to make sure my Kafka consumer was correctly handling an
org.apache.kafka.common.errors.SerializationException. I wanted to make sure that, if it got a bad message that the consumer couldn’t read, it would skip that message and go on to the next one.
I’m adding attributes to my Sources and Flows to apply custom exception handling via
Supervision.Decider. I’m using
Consumer.committablePartitionedSource, and, not knowing which would be the applicable
Source in this case, I made the
Supervision.Decider for both my root source and my per-partition source catch a
SerializationException and do a
So I went and ran a smoke test that would deliberately trigger a
SerializationException on my value deserializer.
Much to my surprise, the custom supervision strategy for either
Source was bypassed, and instead I got an exception that failed the whole stream:
2018-09-14 10:41:32.188 ERROR 22879 --- [lt-dispatcher-2] akka.kafka.internal.KafkaConsumerActor : Exception when polling from consumer org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition FooInbound-3 at offset 17. If needed, please seek past the record to continue consumption. Caused by: org.apache.kafka.common.errors.SerializationException: Can't deserialize data … from topic [FooInbound] Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'random': was expecting ('true', 'false' or 'null') at [Source: (byte)"random text"; line: 1, column: 8] … at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:923) ~[kafka-clients-1.0.2.jar!/:na] … at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111) ~[kafka-clients-1.0.2.jar!/:na] at akka.kafka.internal.KafkaConsumerActor.tryPoll$1(KafkaConsumerActor.scala:348) ~[akka-stream-kafka_2.12-0.22.jar!/:0.22] at akka.kafka.internal.KafkaConsumerActor.poll(KafkaConsumerActor.scala:423) ~[akka-stream-kafka_2.12-0.22.jar!/:0.22] at akka.kafka.internal.KafkaConsumerActor.akka$kafka$internal$KafkaConsumerActor$$receivePoll(KafkaConsumerActor.scala:311) ~[akka-stream-kafka_2.12-0.22.jar!/:0.22] at akka.kafka.internal.KafkaConsumerActor$$anonfun$receive$1.applyOrElse(KafkaConsumerActor.scala:191) ~[akka-stream-kafka_2.12-0.22.jar!/:0.22] at akka.actor.Actor.aroundReceive(Actor.scala:517) ~[akka-actor_2.12-2.5.13.jar!/:2.5.13] at akka.actor.Actor.aroundReceive$(Actor.scala:515) ~[akka-actor_2.12-2.5.13.jar!/:2.5.13] at akka.kafka.internal.KafkaConsumerActor.aroundReceive(KafkaConsumerActor.scala:120) ~[akka-stream-kafka_2.12-0.22.jar!/:0.22] …
Then in my parent vanilla actor that runs my Akka Streams
RunnableGraph, I observed that the Akka Streams execution was failing, due to:
akka.kafka.ConsumerFailed: Consumer actor terminated
I was also observing that the subsequent, valid messages that the Kafka consumer had picked up for that partition in the polling interval were also failing in the same way, and I was also getting a message about dead letters for each of them:
2018-09-14 10:41:32.195 INFO 22879 — [t-dispatcher-35] akka.actor.RepointableActorRef : Message [akka.kafka.internal.KafkaConsumerActor$Internal$RequestMessages] from Actor[akka://FooInboundAlpakkaKafkaConsumer/system/StreamSupervisor-0/$$c#-275126390] to Actor[akka://FooInboundAlpakkaKafkaConsumer/system/kafka-consumer-1#696869527] was not delivered.  dead letters encountered. If this is not an expected behavior, then [Actor[akka://FooInboundAlpakkaKafkaConsumer/system/kafka-consumer-1#696869527]] may have terminated unexpectedly, This logging can be turned off or adjusted with configuration settings ‘akka.log-dead-letters’ and ‘akka.log-dead-letters-during-shutdown’.
In my book, the ability to gracefully handle a
SerializationException, and not fail on it, is a basic, table-stakes feature of any Kafka consumer. (I have an alternate Kafka consumer that uses Spring Cloud Stream that I’m able to switch to via configuration. That implementation handles the
SerializationException exactly as one would expect.)
At this point, the only way I can think of to justify going to production with the Alpakka Kafka connector is to configure the
ByteArrayDeserializer for the deserializers and do the real deserialization in my own code. Clearly, this is a hack, and I’m not even sure offhand if it’s going to be feasible once I start needing to use the
KafkaAvroDeserializer from Confluent.
Does anyone have any better ideas how to deal with
SerializationException in an Akka Streams Kafka consumer?