Handling SerializationException in an Alpakka Kafka consumer

It looks like I’m running into a variation on the problem reported earlier this month in the post Kafka consumer actor stop after Serialization exception. I’m using version 0.22 of akka-stream-kafka_2.12 (the Alpakka Kafka connector).

Out of normal diligence, I wanted to make sure my Kafka consumer was correctly handling an org.apache.kafka.common.errors.SerializationException. I wanted to make sure that, if it got a bad message that the consumer couldn’t read, it would skip that message and go on to the next one.

I’m adding attributes to my Sources and Flows to apply custom exception handling via Supervision.Decider. I’m using Consumer.committablePartitionedSource, and, not knowing which would be the applicable Source in this case, I made the Supervision.Decider for both my root source and my per-partition source catch a SerializationException and do a Supervision.Resume.

So I went and ran a smoke test that would deliberately trigger a SerializationException on my value deserializer.

Much to my surprise, the custom supervision strategy for either Source was bypassed, and instead I got an exception that failed the whole stream:

2018-09-14 10:41:32.188 ERROR 22879 --- [lt-dispatcher-2] akka.kafka.internal.KafkaConsumerActor : Exception when polling from consumer
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition FooInbound-3 at offset 17. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Can't deserialize data … from topic [FooInbound]
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'random': was expecting ('true', 'false' or 'null')
 at [Source: (byte[])"random text"; line: 1, column: 8]
…
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:923) ~[kafka-clients-1.0.2.jar!/:na]
…
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111) ~[kafka-clients-1.0.2.jar!/:na]
at akka.kafka.internal.KafkaConsumerActor.tryPoll$1(KafkaConsumerActor.scala:348) ~[akka-stream-kafka_2.12-0.22.jar!/:0.22]
at akka.kafka.internal.KafkaConsumerActor.poll(KafkaConsumerActor.scala:423) ~[akka-stream-kafka_2.12-0.22.jar!/:0.22]
at akka.kafka.internal.KafkaConsumerActor.akka$kafka$internal$KafkaConsumerActor$$receivePoll(KafkaConsumerActor.scala:311) ~[akka-stream-kafka_2.12-0.22.jar!/:0.22]
at akka.kafka.internal.KafkaConsumerActor$$anonfun$receive$1.applyOrElse(KafkaConsumerActor.scala:191) ~[akka-stream-kafka_2.12-0.22.jar!/:0.22]
at akka.actor.Actor.aroundReceive(Actor.scala:517) ~[akka-actor_2.12-2.5.13.jar!/:2.5.13]
at akka.actor.Actor.aroundReceive$(Actor.scala:515) ~[akka-actor_2.12-2.5.13.jar!/:2.5.13]
at akka.kafka.internal.KafkaConsumerActor.aroundReceive(KafkaConsumerActor.scala:120) ~[akka-stream-kafka_2.12-0.22.jar!/:0.22]
…

Then in my parent vanilla actor that runs my Akka Streams RunnableGraph, I observed that the Akka Streams execution was failing, due to:

akka.kafka.ConsumerFailed: Consumer actor terminated 

I was also observing that the subsequent, valid messages that the Kafka consumer had picked up for that partition in the polling interval were also failing in the same way, and I was also getting a message about dead letters for each of them:

2018-09-14 10:41:32.195 INFO 22879 — [t-dispatcher-35] akka.actor.RepointableActorRef : Message [akka.kafka.internal.KafkaConsumerActor$Internal$RequestMessages] from Actor[akka://FooInboundAlpakkaKafkaConsumer/system/StreamSupervisor-0/$$c#-275126390] to Actor[akka://FooInboundAlpakkaKafkaConsumer/system/kafka-consumer-1#696869527] was not delivered. [1] dead letters encountered. If this is not an expected behavior, then [Actor[akka://FooInboundAlpakkaKafkaConsumer/system/kafka-consumer-1#696869527]] may have terminated unexpectedly, This logging can be turned off or adjusted with configuration settings ‘akka.log-dead-letters’ and ‘akka.log-dead-letters-during-shutdown’.

In my book, the ability to gracefully handle a SerializationException, and not fail on it, is a basic, table-stakes feature of any Kafka consumer. (I have an alternate Kafka consumer that uses Spring Cloud Stream that I’m able to switch to via configuration. That implementation handles the SerializationException exactly as one would expect.)

At this point, the only way I can think of to justify going to production with the Alpakka Kafka connector is to configure the akka.kafka.ConsumerSettings with ByteArrayDeserializer for the deserializers and do the real deserialization in my own code. Clearly, this is a hack, and I’m not even sure offhand if it’s going to be feasible once I start needing to use the KafkaAvroDeserializer from Confluent.

Does anyone have any better ideas how to deal with SerializationException in an Akka Streams Kafka consumer?

OK, I see now that this is the expected behavior. From Error handling:

(de)serialization

If reading from Kafka failure is caused by other reasons, like deserialization problems, then the stage will fail immediately. If you expect such cases, consider consuming raw byte arrays and deserializing in a subsequent map stage where you can use supervision to skip failed elements. See also the “At least once” page for more suggestions.

Consuming raw byte arrays and then doing the real deserialization manually is exactly what I ended up doing.