After 30 min no more messages are sent by the Kafka producer and ‘dead letters’ occur.
It seems to be caused by “Stage completed / Producer closed” which stops the ActorRefSource.
2019-04-01 12:30:52.920UTC DEBUG[Analyzer-akka.actor.default-dispatcher-29] a.k.i.DefaultProducerStage DefaultProducerStage(akka://Analyzer) - Stage completed
2019-04-01 12:30:52.920UTC DEBUG[Analyzer-akka.actor.default-dispatcher-29] a.k.i.DefaultProducerStage DefaultProducerStage(akka://Analyzer) - Producer closed
2019-04-01 12:30:52.934UTC DEBUG[Analyzer-akka.actor.default-dispatcher-29] a.s.i.ActorRefSourceActor akka://Analyzer/system/StreamSupervisor-0/flow-0-2-actorRefSource - stopped
2019-04-01 12:30:53.090UTC INFO [Analyzer-akka.actor.default-dispatcher-24] a.a.RepointableActorRef akka://Analyzer/system/StreamSupervisor-0/flow-0-2-actorRefSource - Message [data.Evaluation] from Actor[akka://Analyzer/system/sharding/AnalyzerActor/38/1683109215-939#1340984343] to Actor[akka://Analyzer/system/StreamSupervisor-0/flow-0-2-actorRefSource#-195168168] was not delivered. [3] dead letters encountered. If this is not an expected behavior, then [Actor[akka://Analyzer/system/StreamSupervisor-0/flow-0-2-actorRefSource#-195168168]] may have terminated unexpectedly, This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
Any ideas why the producer stage completes?
How can I prevent it?
I finally saw in a debug line:
“org.apache.kafka.common.errors.RecordTooLargeException: The message is 1126954 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.”
which caused the stage to complete.
I get the producer exceptions only as debug log, not as error.
2019-04-08 DEBUG[kafka-producer-network-thread | producer-1] o.a.k.c.n.Selector - [Producer clientId=producer-1] Connection with /10.8.150.226 disconnected
java.io.IOException: An existing connection was forcibly closed by the remote host
·at sun.nio.ch.SocketDispatcher.read0(Native Method) ~[na:1.8.0_60]
·at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:43) ~[na:1.8.0_60]
·at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) ~[na:1.8.0_60]
·at sun.nio.ch.IOUtil.read(IOUtil.java:197) ~[na:1.8.0_60]
·at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) ~[na:1.8.0_60]
·at org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:104) ~[kafka-clients-2.0.0.jar:na]
·at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:94) ~[kafka-clients-2.0.0.jar:na]
·at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:335) ~[kafka-clients-2.0.0.jar:na]
·at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:296) ~[kafka-clients-2.0.0.jar:na]
·at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:560) [kafka-clients-2.0.0.jar:na]
·at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:496) [kafka-clients-2.0.0.jar:na]
·at org.apache.kafka.common.network.Selector.poll(Selector.java:425) [kafka-clients-2.0.0.jar:na]
·at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:510) [kafka-clients-2.0.0.jar:na]
·at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239) [kafka-clients-2.0.0.jar:na]
·at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163) [kafka-clients-2.0.0.jar:na]
·at java.lang.Thread.run(Thread.java:745) [na:1.8.0_60]
2019-04-08 DEBUG[kafka-producer-network-thread | producer-1] o.a.k.c.NetworkClient - [Producer clientId=producer-1] Node 4 disconnected.
2019-04-08 WARN [kafka-producer-network-thread | producer-1] o.a.k.c.p.i.Sender - [Producer clientId=producer-1] Received invalid metadata error in produce request on partition rtcp_analyzer_test-6 due to org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.. Going to request metadata update now
2019-04-08 WARN [kafka-producer-network-thread | producer-1] o.a.k.c.p.i.Sender - [Producer clientId=producer-1] Received invalid metadata error in produce request on partition rtcp_analyzer_diag-0 due to org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.. Going to request metadata update now
2019-04-08 WARN [kafka-producer-network-thread | producer-1] o.a.k.c.p.i.Sender - [Producer clientId=producer-1] Received invalid metadata error in produce request on partition rtcp_analyzer_test-2 due to org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.. Going to request metadata update now
2019-04-08 WARN [kafka-producer-network-thread | producer-1] o.a.k.c.p.i.Sender - [Producer clientId=producer-1] Received invalid metadata error in produce request on partition rtcp_analyzer_diag-4 due to org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.. Going to request metadata update now
2019-04-08 WARN [kafka-producer-network-thread | producer-1] o.a.k.c.p.i.Sender - [Producer clientId=producer-1] Received invalid metadata error in produce request on partition rtcp_analyzer_diag-0 due to org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.. Going to request metadata update now
2019-04-08 WARN [kafka-producer-network-thread | producer-1] o.a.k.c.p.i.Sender - [Producer clientId=producer-1] Received invalid metadata error in produce request on partition rtcp_analyzer_diag-4 due to org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.. Going to request metadata update now
2019-04-08 07:35:51.617UTC DEBUG[RTCPAnalyzer-akka.actor.default-dispatcher-16] a.k.i.DefaultProducerStage DefaultProducerStage(akka://RTCPAnalyzer) - Stage completed
2019-04-08 07:35:51.617UTC DEBUG[RTCPAnalyzer-akka.actor.default-dispatcher-16] a.k.i.DefaultProducerStage DefaultProducerStage(akka://RTCPAnalyzer) - Stage completed
Is there a way to make them appear as ERROR in the logging?
Although the documentation states:
Retry handling for producers is built-in into Kafka. In case of failure when sending a message, an exception will be thrown, which should fail the stream.
But how can I build a RestartSource is this setup?