Dead letters after 30 min to Kafka producer plainsink

Hi,

I keep getting dead letters after 30 min running when sending messages from an Actor to a producer.plainSink.

ActorRef outTopicWriterActor = Source.actorRef(Integer.MAX_VALUE,OverflowStrategy.fail())
                .map(json -> new ProducerRecord<String,String>(outtopic,json.toJSONString()))
                .to(Producer.plainSink(producerSettings))
                .run(materializer);

ActorRef analyzerActor = ClusterSharding.get(system).start("AnalyzerActor", Props.create(AnalyzerActor.class, outTopicWriterActor), ClusterShardingSettings.create(system), messageExtractor);


//In the AnalyzerActor:
 actorRef.tell(output,getSelf());


After 30 min no more messages are sent by the Kafka producer and ‘dead letters’ occur.

It seems to be caused by “Stage completed / Producer closed” which stops the ActorRefSource.

2019-04-01 12:30:52.920UTC DEBUG[Analyzer-akka.actor.default-dispatcher-29] a.k.i.DefaultProducerStage DefaultProducerStage(akka://Analyzer) - Stage completed
2019-04-01 12:30:52.920UTC DEBUG[Analyzer-akka.actor.default-dispatcher-29] a.k.i.DefaultProducerStage DefaultProducerStage(akka://Analyzer) - Producer closed
2019-04-01 12:30:52.934UTC DEBUG[Analyzer-akka.actor.default-dispatcher-29] a.s.i.ActorRefSourceActor akka://Analyzer/system/StreamSupervisor-0/flow-0-2-actorRefSource - stopped
2019-04-01 12:30:53.090UTC INFO [Analyzer-akka.actor.default-dispatcher-24] a.a.RepointableActorRef akka://Analyzer/system/StreamSupervisor-0/flow-0-2-actorRefSource - Message [data.Evaluation] from Actor[akka://Analyzer/system/sharding/AnalyzerActor/38/1683109215-939#1340984343] to Actor[akka://Analyzer/system/StreamSupervisor-0/flow-0-2-actorRefSource#-195168168] was not delivered. [3] dead letters encountered. If this is not an expected behavior, then [Actor[akka://Analyzer/system/StreamSupervisor-0/flow-0-2-actorRefSource#-195168168]] may have terminated unexpectedly, This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

Any ideas why the producer stage completes?
How can I prevent it?

Thanks!

No, there is nothing in your code that explains it. Is there anything that could fail? (eg. toJSONString)

Have you tried adding a log operator? https://doc.akka.io/docs/akka/current/stream/operators/Source-or-Flow/log.html

Cheers,
Enno.

1 Like

I finally saw in a debug line:
“org.apache.kafka.common.errors.RecordTooLargeException: The message is 1126954 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.”
which caused the stage to complete.

Maybe it should be a clear error in alpakka.

Thanks.

Great that you found the cause.
You could have seen it in the stream completion future by keeping it with

.toMat(Producer.plainSink(producerSettings), Keep.both())

Please try and report if it shows as a failure on the CompletionStage.

Cheers,
Enno.

Thank you.

I get the producer exceptions only as debug log, not as error.

2019-04-08 DEBUG[kafka-producer-network-thread | producer-1] o.a.k.c.n.Selector - [Producer clientId=producer-1] Connection with /10.8.150.226 disconnected

java.io.IOException: An existing connection was forcibly closed by the remote host

·at sun.nio.ch.SocketDispatcher.read0(Native Method) ~[na:1.8.0_60]

·at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:43) ~[na:1.8.0_60]

·at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) ~[na:1.8.0_60]

·at sun.nio.ch.IOUtil.read(IOUtil.java:197) ~[na:1.8.0_60]

·at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) ~[na:1.8.0_60]

·at org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:104) ~[kafka-clients-2.0.0.jar:na]

·at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:94) ~[kafka-clients-2.0.0.jar:na]

·at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:335) ~[kafka-clients-2.0.0.jar:na]

·at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:296) ~[kafka-clients-2.0.0.jar:na]

·at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:560) [kafka-clients-2.0.0.jar:na]

·at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:496) [kafka-clients-2.0.0.jar:na]

·at org.apache.kafka.common.network.Selector.poll(Selector.java:425) [kafka-clients-2.0.0.jar:na]

·at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:510) [kafka-clients-2.0.0.jar:na]

·at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239) [kafka-clients-2.0.0.jar:na]

·at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163) [kafka-clients-2.0.0.jar:na]

·at java.lang.Thread.run(Thread.java:745) [na:1.8.0_60]

2019-04-08 DEBUG[kafka-producer-network-thread | producer-1] o.a.k.c.NetworkClient - [Producer clientId=producer-1] Node 4 disconnected.

2019-04-08 WARN [kafka-producer-network-thread | producer-1] o.a.k.c.p.i.Sender - [Producer clientId=producer-1] Received invalid metadata error in produce request on partition rtcp_analyzer_test-6 due to org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.. Going to request metadata update now

2019-04-08 WARN [kafka-producer-network-thread | producer-1] o.a.k.c.p.i.Sender - [Producer clientId=producer-1] Received invalid metadata error in produce request on partition rtcp_analyzer_diag-0 due to org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.. Going to request metadata update now

2019-04-08 WARN [kafka-producer-network-thread | producer-1] o.a.k.c.p.i.Sender - [Producer clientId=producer-1] Received invalid metadata error in produce request on partition rtcp_analyzer_test-2 due to org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.. Going to request metadata update now

2019-04-08 WARN [kafka-producer-network-thread | producer-1] o.a.k.c.p.i.Sender - [Producer clientId=producer-1] Received invalid metadata error in produce request on partition rtcp_analyzer_diag-4 due to org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.. Going to request metadata update now

2019-04-08 WARN [kafka-producer-network-thread | producer-1] o.a.k.c.p.i.Sender - [Producer clientId=producer-1] Received invalid metadata error in produce request on partition rtcp_analyzer_diag-0 due to org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.. Going to request metadata update now

2019-04-08 WARN [kafka-producer-network-thread | producer-1] o.a.k.c.p.i.Sender - [Producer clientId=producer-1] Received invalid metadata error in produce request on partition rtcp_analyzer_diag-4 due to org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.. Going to request metadata update now

2019-04-08 07:35:51.617UTC DEBUG[RTCPAnalyzer-akka.actor.default-dispatcher-16] a.k.i.DefaultProducerStage DefaultProducerStage(akka://RTCPAnalyzer) - Stage completed

2019-04-08 07:35:51.617UTC DEBUG[RTCPAnalyzer-akka.actor.default-dispatcher-16] a.k.i.DefaultProducerStage DefaultProducerStage(akka://RTCPAnalyzer) - Stage completed

Is there a way to make them appear as ERROR in the logging?

Although the documentation states:

Retry handling for producers is built-in into Kafka. In case of failure when sending a message, an exception will be thrown, which should fail the stream.

But how can I build a RestartSource is this setup?

RestartSource.onFailuresWithBackoff(Duration.ofSeconds(3),Duration.ofSeconds(30),0.2,() -> Source.fromSourceCompletionStage(outTopicWriterActor));

Something like that?

You could use the Producer.flexiFlow within the RestartSource and close the stream with Sink.ignore.

Enno.