Consumer interrupted with WakeupException after timeout. Message: null


#1

Hi using lagom 1.4.11 on kubernetes. Kafka is working all event processors are working fine.
But if I using a topic I get all the time this error.

Consumer interrupted with WakeupException after timeout. Message: null.

My log is here:

2019-03-12T09:47:29.015Z [warn] akka.kafka.KafkaConsumerActor [sourceThread=application-akka.kafka.default-dispatcher-29, akkaTimestamp=09:47:29.015UTC, akkaSource=akka.tcp://application@10.233.97.135:2552/system/kafka-consumer-1, sourceActorSystem=application] - Consumer interrupted with WakeupException after timeout. Message: null. Current value of akka.kafka.consumer.wakeup-timeout is 3000 milliseconds
2019-03-12T09:47:29.126Z [warn] akka.remote.EndpointReader [sourceThread=application-akka.remote.default-remote-dispatcher-32, akkaTimestamp=09:47:29.126UTC, akkaSource=akka.tcp://application@10.233.97.135:2552/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2Fapplication%40*.*.*.*%3A2552-1/endpointWriter/endpointReader-akka.tcp%3A%2F%2Fapplication%40*.*.*.*%3A2552-0, sourceActorSystem=application] - Discarding inbound message to [unknown] in read-only association to [akka.tcp://application@*.*.*.*:2552]. If this happens often you may consider using akka.remote.use-passive-connections=off or use Artery TCP.
2019-03-12T09:47:32.105Z [warn] akka.kafka.KafkaConsumerActor [sourceThread=application-akka.kafka.default-dispatcher-36, akkaTimestamp=09:47:32.105UTC, akkaSource=akka.tcp://application@10.233.97.135:2552/system/kafka-consumer-1, sourceActorSystem=application] - Consumer interrupted with WakeupException after timeout. Message: null. Current value of akka.kafka.consumer.wakeup-timeout is 3000 milliseconds
2019-03-12T09:47:35.195Z [warn] akka.kafka.KafkaConsumerActor [sourceThread=application-akka.kafka.default-dispatcher-52, akkaTimestamp=09:47:35.194UTC, akkaSource=akka.tcp://application@10.233.97.135:2552/system/kafka-consumer-1, sourceActorSystem=application] - Consumer interrupted with WakeupException after timeout. Message: null. Current value of akka.kafka.consumer.wakeup-timeout is 3000 milliseconds
2019-03-12T09:47:38.285Z [warn] akka.kafka.KafkaConsumerActor [sourceThread=application-akka.kafka.default-dispatcher-54, akkaTimestamp=09:47:38.284UTC, akkaSource=akka.tcp://application@10.233.97.135:2552/system/kafka-consumer-1, sourceActorSystem=application] - Consumer interrupted with WakeupException after timeout. Message: null. Current value of akka.kafka.consumer.wakeup-timeout is 3000 milliseconds
2019-03-12T09:47:41.375Z [warn] akka.kafka.KafkaConsumerActor [sourceThread=application-akka.kafka.default-dispatcher-56, akkaTimestamp=09:47:41.375UTC, akkaSource=akka.tcp://application@10.233.97.135:2552/system/kafka-consumer-1, sourceActorSystem=application] - Consumer interrupted with WakeupException after timeout. Message: null. Current value of akka.kafka.consumer.wakeup-timeout is 3000 milliseconds
2019-03-12T09:47:44.465Z [warn] akka.kafka.KafkaConsumerActor [sourceThread=application-akka.kafka.default-dispatcher-58, akkaTimestamp=09:47:44.465UTC, akkaSource=akka.tcp://application@10.233.97.135:2552/system/kafka-consumer-1, sourceActorSystem=application] - Consumer interrupted with WakeupException after timeout. Message: null. Current value of akka.kafka.consumer.wakeup-timeout is 3000 milliseconds
2019-03-12T09:47:47.555Z [warn] akka.kafka.KafkaConsumerActor [sourceThread=application-akka.kafka.default-dispatcher-59, akkaTimestamp=09:47:47.554UTC, akkaSource=akka.tcp://application@10.233.97.135:2552/system/kafka-consumer-1, sourceActorSystem=application] - Consumer interrupted with WakeupException after timeout. Message: null. Current value of akka.kafka.consumer.wakeup-timeout is 3000 milliseconds
2019-03-12T09:47:50.646Z [warn] akka.kafka.KafkaConsumerActor [sourceThread=application-akka.kafka.default-dispatcher-24, akkaTimestamp=09:47:50.645UTC, akkaSource=akka.tcp://application@10.233.97.135:2552/system/kafka-consumer-1, sourceActorSystem=application] - Consumer interrupted with WakeupException after timeout. Message: null. Current value of akka.kafka.consumer.wakeup-timeout is 3000 milliseconds
2019-03-12T09:47:53.740Z [error] akka.kafka.KafkaConsumerActor [sourceThread=application-akka.kafka.default-dispatcher-28, akkaTimestamp=09:47:53.738UTC, akkaSource=akka.tcp://application@10.233.97.135:2552/system/kafka-consumer-1, sourceActorSystem=application] - WakeupException limit exceeded, stopping.
2019-03-12T09:47:53.755Z [error] com.lightbend.lagom.internal.broker.kafka.KafkaSubscriberActor [sourceThread=application-akka.actor.default-dispatcher-19, akkaTimestamp=09:47:53.754UTC, akkaSource=akka.tcp://application@10.233.97.135:2552/user/KafkaBackoffConsumer1-topic-item-created/KafkaConsumerActor1-topic-item-created, sourceActorSystem=application] - Topic subscription interrupted due to failure: [akka.kafka.ConsumerFailed: Consumer actor terminated]
2019-03-12T09:47:53.772Z [error] akka.actor.OneForOneStrategy [sourceThread=application-akka.actor.default-dispatcher-19, akkaTimestamp=09:47:53.765UTC, akkaSource=akka://application/user/KafkaBackoffConsumer1-topic-item-created/KafkaConsumerActor1-topic-item-created, sourceActorSystem=application] - Consumer actor terminated
akka.kafka.ConsumerFailed: Consumer actor terminated
	at akka.kafka.internal.SingleSourceLogic.$anonfun$preStart$1(SingleSourceLogic.scala:54)
	at akka.kafka.internal.SingleSourceLogic.$anonfun$preStart$1$adapted(SingleSourceLogic.scala:40)
	at akka.stream.stage.GraphStageLogic$StageActor.internalReceive(GraphStage.scala:230)
	at akka.stream.stage.GraphStageLogic$StageActor.$anonfun$callback$1(GraphStage.scala:198)
	at akka.stream.stage.GraphStageLogic$StageActor.$anonfun$callback$1$adapted(GraphStage.scala:198)
	at akka.stream.impl.fusing.GraphInterpreter.runAsyncInput(GraphInterpreter.scala:454)
	at akka.stream.impl.fusing.GraphInterpreterShell$AsyncInput.execute(ActorGraphInterpreter.scala:468)
	at akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:563)
	at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:745)
	at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:760)
	at akka.actor.Actor.aroundReceive(Actor.scala:517)
	at akka.actor.Actor.aroundReceive$(Actor.scala:515)
	at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:670)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
	at akka.actor.ActorCell.invoke(ActorCell.scala:561)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
	at akka.dispatch.Mailbox.run(Mailbox.scala:225)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

My topic is here:

topic("topic-item-created", createdItemTopic ).
        addProperty(
          KafkaProperties.partitionKeyStrategy,
          PartitionKeyStrategy[StorageItemTopic](_.itemId)
        ),
      topic("item-issued-topic", issuedItemTopic )

The called topic is “topic-item-created”.
What could be the problem. Can someone help me?
Thank you


#2

In cassandra I found this following entries

topicProducer-item-issued-topic |   de.scalamat.storageManagement.impl.storageItem.event.ItemEvent0 |           null |                                 null
          topicProducer-item-issued-topic |   de.scalamat.storageManagement.impl.storageItem.event.ItemEvent1 |           null |                                 null
          topicProducer-item-issued-topic |   de.scalamat.storageManagement.impl.storageItem.event.ItemEvent2 |           null |                                 null
          topicProducer-item-issued-topic |   de.scalamat.storageManagement.impl.storageItem.event.ItemEvent3 |           null |                                 null
          topicProducer-item-issued-topic |   de.scalamat.storageManagement.impl.storageItem.event.ItemEvent4 |           null |                                 null
          topicProducer-item-issued-topic |   de.scalamat.storageManagement.impl.storageItem.event.ItemEvent5 |           null |                                 null
          topicProducer-item-issued-topic |   de.scalamat.storageManagement.impl.storageItem.event.ItemEvent6 |           null |                                 null
          topicProducer-item-issued-topic |   de.scalamat.storageManagement.impl.storageItem.event.ItemEvent7 |           null |                                 null
          topicProducer-item-issued-topic |   de.scalamat.storageManagement.impl.storageItem.event.ItemEvent8 |           null |                                 null
          topicProducer-item-issued-topic |   de.scalamat.storageManagement.impl.storageItem.event.ItemEvent9 |           null |                                 null
         topicProducer-topic-item-created |   de.scalamat.storageManagement.impl.storageItem.event.ItemEvent0 |           null |                                 null
         topicProducer-topic-item-created |   de.scalamat.storageManagement.impl.storageItem.event.ItemEvent1 |           null |                                 null
         topicProducer-topic-item-created |   de.scalamat.storageManagement.impl.storageItem.event.ItemEvent2 |           null |                                 null
         topicProducer-topic-item-created |   de.scalamat.storageManagement.impl.storageItem.event.ItemEvent3 |           null |                                 null
         topicProducer-topic-item-created |   de.scalamat.storageManagement.impl.storageItem.event.ItemEvent4 |           null |                                 null
         topicProducer-topic-item-created |   de.scalamat.storageManagement.impl.storageItem.event.ItemEvent5 |           null |                                 null
         topicProducer-topic-item-created |   de.scalamat.storageManagement.impl.storageItem.event.ItemEvent6 |           null |                                 null
         topicProducer-topic-item-created |   de.scalamat.storageManagement.impl.storageItem.event.ItemEvent7 |           null |                                 null
         topicProducer-topic-item-created |   de.scalamat.storageManagement.impl.storageItem.event.ItemEvent8 |           null |                                 null
         topicProducer-topic-item-created |   de.scalamat.storageManagement.impl.storageItem.event.ItemEvent9 |           null |                                 null

(Alan Klikic) #3

Hi,

Cassandra offfsetstore table has timeuuidoffset null for all record meaning that that nothing was published yet to any of the topics.
Can you check if kafka_native is configured correctly and if yes, can you connect to kafka from your pod?

Br,
Alan


#4

How can I check? I have access with kubectl to all pods.
I am in cassandra at my kubernetes. I the that my event processor table works fine and has a number of entries. Is that a sign for working kafka_native?


(Alan Klikic) #5

If you are referring to Cassandra offsetstore table, table is always populated with topic producer/readside processor name and tag names.
But timeuuidoffset in your case is null. timeuuidoffset is indicating last processed event offset per tag.

For testing connectivity:
Lagom base docker image (openjdk:8-jre-alpine) does not have telnet pre-installed but you could use wget.

Do this:

kubectl -n <your namespace> exec -ti <your pod name> -- wget http://<kafka ip>:9092

If you get something like this:

Connecting to 10.0.2.97:9092 (10.0.2.97:9092)
wget: server returned error: HTTP/1.1 404 Not Found
command terminated with exit code 1

then connectivity is there.


#6

Thanks. I think the connectivity is not there.

kubectl -n avalon exec -ti airservice-5f64b97b59-7bflh -- wget http://kafka.avalon.svc.cluster.local:9092
Connecting to kafka.avalon.svc.cluster.local:9092 (10.***.***.***:9092)
wget: error getting response: Connection reset by peer
command terminated with exit code 1

also with ip and an other pod

andre@andre:~$ kubectl -n avalon exec -ti storagemanagement-6fb7449dbd-n5ndn -- wget http://10.***.***.***:9092
Connecting to 10.***.***.***:9092 (10.***.***.***:9092)
wget: error getting response: Connection reset by peer
command terminated with exit code 1

#7

I think I have the incorrect broker. My conf:

lagom.broker.kafka {
  # The name of the Kafka service to look up out of the service locator.
  # If this is an empty string, then a service locator lookup will not be done,
  # and the brokers configuration will be used instead.
  service-name = ""
  service-name = ${?KAFKA_SERVICE_NAME}

  # The URLs of the Kafka brokers. Separate each URL with a comma.
  # This will be ignored if the service-name configuration is non empty.
  #brokers = ${lagom.broker.defaults.kafka.brokers}
  brokers = "kafka.avalon.svc.cluster.local:9092"

  client {
    default {
      # Exponential backoff for failures
      failure-exponential-backoff {
        # minimum (initial) duration until processor is started again
        # after failure
        min = 3s

        # the exponential back-off is capped to this duration
        max = 30s

        # additional random delay is based on this factor
        random-factor = 0.2
      }
    }

    # configuration used by the Lagom Kafka producer
    producer = ${lagom.broker.kafka.client.default}
    producer.role = ""

    # configuration used by the Lagom Kafka consumer
    consumer {
      failure-exponential-backoff = ${lagom.broker.kafka.client.default.failure-exponential-backoff}

      # The number of offsets that will be buffered to allow the consumer flow to
      # do its own buffering. This should be set to a number that is at least as
      # large as the maximum amount of buffering that the consumer flow will do,
      # if the consumer buffer buffers more than this, the offset buffer will
      # backpressure and cause the stream to stop.
      offset-buffer = 100

      # Number of messages batched together by the consumer before the related messages'
      # offsets are committed to Kafka.
      # By increasing the batching-size you are trading speed with the risk of having
      # to re-process a larger number of messages if a failure occurs.
      # The value provided must be strictly greater than zero.
      batching-size = 20

      # Interval of time waited by the consumer before the currently batched messages'
      # offsets are committed to Kafka.
      # This parameter is useful to ensure that messages' offsets are always committed
      # within a fixed amount of time.
      # The value provided must be strictly greater than zero.
      batching-interval = 5 seconds
    }
  }
}

lagom.services {
  cas_native = "_cql._tcp.cassandra.avalon.svc.cluster.local"
}

I change my config add set the service-name to kafka (name of the service in k8)
I got this error:

2019-03-12T17:15:48.885Z [error] akka.actor.OneForOneStrategy [sourceThread=application-akka.actor.default-dispatcher-14, akkaTimestamp=17:15:48.884UTC, akkaSource=akka://application/system/sharding/kafkaProducer-item-issued-topic/de.scalamat.storageManagement.impl.storageItem.event.ItemEvent0/de.scalamat.storageManagement.impl.storageItem.event.ItemEvent0/producer, sourceActorSystem=application] - requirement failed: missing port in //10.233.54.201
java.lang.IllegalArgumentException: requirement failed: missing port in //10.***.54.201
	at scala.Predef$.require(Predef.scala:277)
	at com.lightbend.lagom.internal.api.UriUtils$.hostAndPort(UriUtils.scala:20)
	at com.lightbend.lagom.internal.api.UriUtils$.$anonfun$hostAndPorts$1(UriUtils.scala:14)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:234)
	at scala.collection.immutable.List.foreach(List.scala:389)
	at scala.collection.TraversableLike.map(TraversableLike.scala:234)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:227)
	at scala.collection.immutable.List.map(List.scala:295)
	at com.lightbend.lagom.internal.api.UriUtils$.hostAndPorts(UriUtils.scala:14)
	at com.lightbend.lagom.internal.broker.kafka.Producer$TaggedOffsetProducerActor$$anonfun$receive$1.$anonfun$applyOrElse$2(Producer.scala:97)
	at scala.util.Success.$anonfun$map$1(Try.scala:251)
	at scala.util.Success.map(Try.scala:209)
	at scala.concurrent.Future.$anonfun$map$1(Future.scala:289)
	at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29)
	at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29)
	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
	at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
	at akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:91)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
	at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81)
	at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:91)
	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

(Alan Klikic) #8

Is there a reason why you configure broker endpoints in application.conf?
Check Online Auction example and Lagom kubernetes setup considerations


#9

Sure. Without that I get the error (see my edit).

requirement failed: missing port in //10.***.54.201

I try now to remove the lagom.broker.kafka settings in my conf (like online auction example) and give the external_services param in run command.


#10

I change my application conf. Removed kafka settings. I added to my rp command the external_service for kafka.
Result

2019-03-12T18:04:31.550Z [error] com.lightbend.lagom.internal.broker.kafka.Producer$TaggedOffsetProducerActor [sourceThread=application-akka.actor.default-dispatcher-3, akkaTimestamp=18:04:31.550UTC, akkaSource=akka.tcp://application@10.***.75.17:2552/system/sharding/kafkaProducer-item-issued-topic/de.scalamat.storageManagement.impl.storageItem.event.ItemEvent5/de.scalamat.storageManagement.impl.storageItem.event.ItemEvent5/producer, sourceActorSystem=application] - Unable to locate Kafka service named [kafka_native]

(Alan Klikic) #11

Did you config kafka k8s service? Check second link for configuring headless service for kafka.


#12
rp generate-kubernetes-resources "docker-registry.****.info/storagemanagement-impl:0.105-SNAPSHOT"  \
--pod-controller-replicas 3 --generate-all \
--deployment-type rolling \
--external-service "cas_native=_cql._tcp.cassandra.avalon.svc.cluster.local" \
--external-service "kafka_native=_broker._tcp.kafka.avalon.svc.cluster.local" \
--pod-controller-image-pull-policy Always --ingress-annotation kubernetes.io/ingress.class=traefik \
--namespace avalon --env JAVA_OPTS="-Dplay.http.secret.key=...... -Dplay.filters.hosts.allowed.0=." | kubectl apply -f -

SVC:

kafka                                    ClusterIP   10.***.54.201   <none>        9092/TCP                                       21d
kafka-headless                           ClusterIP   None            <none>        9092/TCP                                       21d
kafka-zookeeper                          ClusterIP   10.***.21.50    <none>        2181/TCP,2888/TCP,3888/TCP                     21d
kafka-zookeeper-headless                 ClusterIP   None            <none>        2181/TCP,2888/TCP,3888/TCP                     21d

What for a config you. I did not setup the server. I have here the values.yml


(Alan Klikic) #13

Did you configure RP service locator?


#14

I think yes.

class AIRServiceLoader extends LagomApplicationLoader {

  //Lädt den entwicklungsmodus!
  override def loadDevMode(context: LagomApplicationContext): LagomApplication = {
    new AIRServiceApplication(context) with LagomDevModeComponents
  }

  override def load(context: LagomApplicationContext): LagomApplication = {
    new AIRServiceApplication(context) with LagomServiceLocatorComponents
  }

  override def describeService = Some(readDescriptor[ArtifactIdentifierRelationService])

}

But I have no lookupOne. The µService can communicate with each other.


(Alan Klikic) #15

Can you share kafka headless describe?


#16

of course.

andre@andre-home:~$ kubectl -n avalon describe svc kafka-headless
Name:              kafka-headless
Namespace:         avalon
Labels:            app=kafka
                   chart=kafka-1.3.0
                   heritage=Tiller
                   release=kafka
Annotations:       <none>
Selector:          app=kafka,release=kafka
Type:              ClusterIP
IP:                None
Port:              kafka  9092/TCP
TargetPort:        kafka/TCP
Endpoints:         10.***.74.107:9092
Session Affinity:  None
Events:            <none>

(Alan Klikic) #17

k8s DNS SRV format is:

_my-port-name._my-port-protocol.my-svc.my-namespace.svc.cluster.local

So in your case it should be:

"kafka_native=_kafka._tcp.kafka-headless.avalon.svc.cluster.local"


#18

damn. Okay my mistake. Sorry.

I correct them and great the errors with no connection are gone. But still termination :frowning:

2019-03-12T19:22:24.277Z [warn] akka.kafka.KafkaConsumerActor [sourceThread=application-akka.kafka.default-dispatcher-58, akkaTimestamp=19:22:24.277UTC, akkaSource=akka.tcp://application@10.***.74.118:2552/system/kafka-consumer-2, sourceActorSystem=application] - Consumer interrupted with WakeupException after timeout. Message: null. Current value of akka.kafka.consumer.wakeup-timeout is 3000 milliseconds
2019-03-12T19:22:28.318Z [error] akka.kafka.KafkaConsumerActor [sourceThread=application-akka.kafka.default-dispatcher-60, akkaTimestamp=19:22:28.317UTC, akkaSource=akka.tcp://application@10.***.74.118:2552/system/kafka-consumer-2, sourceActorSystem=application] - WakeupException limit exceeded, stopping.
2019-03-12T19:22:28.323Z [error] com.lightbend.lagom.internal.broker.kafka.KafkaSubscriberActor [sourceThread=application-akka.actor.default-dispatcher-14, akkaTimestamp=19:22:28.322UTC, akkaSource=akka.tcp://application@10.***.74.118:2552/user/KafkaBackoffConsumer1-topic-item-created/KafkaConsumerActor1-topic-item-created, sourceActorSystem=application] - Topic subscription interrupted due to failure: [akka.kafka.ConsumerFailed: Consumer actor terminated]
2019-03-12T19:22:28.325Z [error] akka.actor.OneForOneStrategy [sourceThread=application-akka.actor.default-dispatcher-3, akkaTimestamp=19:22:28.323UTC, akkaSource=akka://application/user/KafkaBackoffConsumer1-topic-item-created/KafkaConsumerActor1-topic-item-created, sourceActorSystem=application] - Consumer actor terminated
akka.kafka.ConsumerFailed: Consumer actor terminated
	at akka.kafka.internal.SingleSourceLogic.$anonfun$preStart$1(SingleSourceLogic.scala:54)
	at akka.kafka.internal.SingleSourceLogic.$anonfun$preStart$1$adapted(SingleSourceLogic.scala:40)
	at akka.stream.stage.GraphStageLogic$StageActor.internalReceive(GraphStage.scala:230)
	at akka.stream.stage.GraphStageLogic$StageActor.$anonfun$callback$1(GraphStage.scala:198)
	at akka.stream.stage.GraphStageLogic$StageActor.$anonfun$callback$1$adapted(GraphStage.scala:198)
	at akka.stream.impl.fusing.GraphInterpreter.runAsyncInput(GraphInterpreter.scala:454)
	at akka.stream.impl.fusing.GraphInterpreterShell$AsyncInput.execute(ActorGraphInterpreter.scala:468)
	at akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:563)
	at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:745)
	at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:760)
	at akka.actor.Actor.aroundReceive(Actor.scala:517)
	at akka.actor.Actor.aroundReceive$(Actor.scala:515)
	at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:670)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
	at akka.actor.ActorCell.invoke(ActorCell.scala:561)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
	at akka.dispatch.Mailbox.run(Mailbox.scala:225)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


#19

This error comes from airservice which only consume (until now) the artifactservice which provides the topic has this error.

2019-03-12T19:27:40.272Z [error] akka.actor.OneForOneStrategy [sourceThread=application-akka.actor.default-dispatcher-4, akkaTimestamp=19:27:40.270UTC, akkaSource=akka://application/system/sharding/kafkaProducer-item-issued-topic/de.scalamat.storageManagement.impl.storageItem.event.ItemEvent3/de.scalamat.storageManagement.impl.storageItem.event.ItemEvent3/producer, sourceActorSystem=application] - Ask timed out on [Actor[akka://application/system/sharding/ItemEntity#-714333201]] after [5000 ms]. Message of type [com.lightbend.lagom.scaladsl.persistence.CommandEnvelope]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply.
akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://application/system/sharding/ItemEntity#-714333201]] after [5000 ms]. Message of type [com.lightbend.lagom.scaladsl.persistence.CommandEnvelope]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply.
	at akka.pattern.PromiseActorRef$.$anonfun$defaultOnTimeout$1(AskSupport.scala:635)
	at akka.pattern.PromiseActorRef$.$anonfun$apply$1(AskSupport.scala:650)
	at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205)
	at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:866)
	at scala.concurrent.BatchingExecutor.execute(BatchingExecutor.scala:109)
	at scala.concurrent.BatchingExecutor.execute$(BatchingExecutor.scala:103)
	at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:864)
	at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328)
	at akka.actor.LightArrayRevolverScheduler$$anon$3.executeBucket$1(LightArrayRevolverScheduler.scala:279)
	at akka.actor.LightArrayRevolverScheduler$$anon$3.nextTick(LightArrayRevolverScheduler.scala:283)
	at akka.actor.LightArrayRevolverScheduler$$anon$3.run(LightArrayRevolverScheduler.scala:235)
	at java.lang.Thread.run(Thread.java:748)

I think i see the error.

topic("item-issued-topic", issuedItemTopic )

this was my declartion before now I changed to

topic("item-issued-topic", issuedItemTopic ).
        addProperty(
          KafkaProperties.partitionKeyStrategy,
          PartitionKeyStrategy[StorageItemTopic](_.itemId)
        )

Because my created has no problems and is already defined this way. I will have a look

I tried also the connectivity test. The same response as before.


#20

At producer side I get a few interesting errors.

2019-03-13T07:31:39.499Z [warn] org.apache.kafka.clients.NetworkClient [] - Error while fetching metadata with correlation id 588 : {topic-item-issued=UNKNOWN}
2019-03-13T07:31:39.546Z [warn] org.apache.kafka.clients.NetworkClient [] - Error while fetching metadata with correlation id 1 : {topic-item-issued=UNKNOWN}
2019-03-13T07:31:39.566Z [error] akka.actor.OneForOneStrategy [sourceThread=application-akka.actor.default-dispatcher-14, akkaTimestamp=07:31:39.565UTC, akkaSource=akka://application/system/sharding/kafkaProducer-topic-item-issued/de.scalamat.storageManagement.impl.storageItem.event.ItemEvent7/de.scalamat.storageManagement.impl.storageItem.event.ItemEvent7/producer, sourceActorSystem=application] - Failed to update metadata after 60000 ms.
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
2019-03-13T07:31:39.566Z [error] akka.actor.OneForOneStrategy [sourceThread=application-akka.actor.default-dispatcher-16, akkaTimestamp=07:31:39.565UTC, akkaSource=akka://application/system/sharding/kafkaProducer-topic-item-issued/de.scalamat.storageManagement.impl.storageItem.event.ItemEvent8/de.scalamat.storageManagement.impl.storageItem.event.ItemEvent8/producer, sourceActorSystem=application] - Failed to update metadata after 60000 ms.
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.

this is solved! I renamed my topic id and after reset to the old id name the error not occur.

I started a new µService with

rp generate-kubernetes-resources "docker-registry.****.info/artifactidentifierrelationservice-impl:0.110-SNAPSHOT" \
--pod-controller-replicas 3 --generate-all --deployment-type rolling \
--external-service "cas_native=_cql._tcp.cassandra.avalon.svc.cluster.local" \
--external-service "kafka_native=_kafka._tcp.kafka-headless.avalon.svc.cluster.local" \
--pod-controller-image-pull-policy Always \
--ingress-annotation kubernetes.io/ingress.class=traefik \
--namespace avalon --env JAVA_OPTS="-Dplay.http.secret.key=.... -Dplay.filters.hosts.allowed.0=." | kubectl apply -f -

This Service only consume (yet). But I have the same null messages as before.

Maybe the kafka logs can help.

[2019-03-13 03:40:24,529] INFO [GroupMetadataManager brokerId=1001] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2019-03-13 03:50:24,529] INFO [GroupMetadataManager brokerId=1001] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2019-03-13 04:00:24,529] INFO [GroupMetadataManager brokerId=1001] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2019-03-13 04:10:24,530] INFO [GroupMetadataManager brokerId=1001] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2019-03-13 04:11:53,798] WARN Attempting to send response via channel for which there is no open connection, connection id 10.233.74.107:9092-10.233.75.29:54096-132490 (kafka.network.Processor)
[2019-03-13 04:20:24,529] INFO [GroupMetadataManager brokerId=1001] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2019-03-13 04:30:24,529] INFO [GroupMetadataManager brokerId=1001] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2019-03-13 04:40:24,529] INFO [GroupMetadataManager brokerId=1001] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2019-03-13 04:50:24,529] INFO [GroupMetadataManager brokerId=1001] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2019-03-13 04:57:31,753] WARN Attempting to send response via channel for which there is no open connection, connection id 10.233.74.107:9092-10.233.75.16:52676-133006 (kafka.network.Processor)
[2019-03-13 05:00:24,529] INFO [GroupMetadataManager brokerId=1001] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2019-03-13 05:10:24,530] INFO [GroupMetadataManager brokerId=1001] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2019-03-13 05:18:53,867] WARN Attempting to send response via channel for which there is no open connection, connection id 10.233.74.107:9092-10.233.97.140:56174-133253 (kafka.network.Processor)
[2019-03-13 05:20:24,529] INFO [GroupMetadataManager brokerId=1001] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2019-03-13 05:30:24,530] INFO [GroupMetadataManager brokerId=1001] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2019-03-13 05:40:24,529] INFO [GroupMetadataManager brokerId=1001] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2019-03-13 05:50:24,530] INFO [GroupMetadataManager brokerId=1001] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2019-03-13 06:00:24,529] INFO [GroupMetadataManager brokerId=1001] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2019-03-13 06:10:24,529] INFO [GroupMetadataManager brokerId=1001] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2019-03-13 06:20:24,529] INFO [GroupMetadataManager brokerId=1001] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2019-03-13 06:30:24,529] INFO [GroupMetadataManager brokerId=1001] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2019-03-13 06:40:24,529] INFO [GroupMetadataManager brokerId=1001] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2019-03-13 06:50:24,529] INFO [GroupMetadataManager brokerId=1001] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2019-03-13 07:00:24,530] INFO [GroupMetadataManager brokerId=1001] Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2019-03-13 07:10:24,529] INFO [GroupMetadataManager brokerId=1001] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2019-03-13 07:20:24,529] INFO [GroupMetadataManager brokerId=1001] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2019-03-13 07:30:24,529] INFO [GroupMetadataManager brokerId=1001] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2019-03-13 07:40:24,529] INFO [GroupMetadataManager brokerId=1001] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2019-03-13 07:50:24,529] INFO [GroupMetadataManager brokerId=1001] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2019-03-13 08:00:24,529] INFO [GroupMetadataManager brokerId=1001] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2019-03-13 08:10:24,529] INFO [GroupMetadataManager brokerId=1001] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2019-03-13 08:20:24,529] INFO [GroupMetadataManager brokerId=1001] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2019-03-13 08:30:24,531] INFO [GroupMetadataManager brokerId=1001] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2019-03-13 08:40:24,530] INFO [GroupMetadataManager brokerId=1001] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2019-03-13 08:45:53,985] INFO [Log partition=item-issued-topic-0, dir=/opt/bitnami/kafka/data] Found deletable segments with base offsets [18] due to retention time 604800000ms breach (kafka.log.Log)
[2019-03-13 08:45:53,986] INFO [Log partition=item-issued-topic-0, dir=/opt/bitnami/kafka/data] Scheduling log segment [baseOffset 18, size 225162] for deletion. (kafka.log.Log)
[2019-03-13 08:45:53,987] INFO [Log partition=item-issued-topic-0, dir=/opt/bitnami/kafka/data] Incrementing log start offset to 439 (kafka.log.Log)
[2019-03-13 08:46:53,987] INFO [Log partition=item-issued-topic-0, dir=/opt/bitnami/kafka/data] Deleting segment 18 (kafka.log.Log)
[2019-03-13 08:46:53,989] INFO Deleted log /opt/bitnami/kafka/data/item-issued-topic-0/00000000000000000018.log.deleted. (kafka.log.LogSegment)
[2019-03-13 08:46:53,989] INFO Deleted offset index /opt/bitnami/kafka/data/item-issued-topic-0/00000000000000000018.index.deleted. (kafka.log.LogSegment)
[2019-03-13 08:46:53,989] INFO Deleted time index /opt/bitnami/kafka/data/item-issued-topic-0/00000000000000000018.timeindex.deleted. (kafka.log.LogSegment)
[2019-03-13 08:50:24,529] INFO [GroupMetadataManager brokerId=1001] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2019-03-13 09:00:24,530] INFO [GroupMetadataManager brokerId=1001] Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2019-03-13 09:05:53,985] INFO [Log partition=topic-item-created-0, dir=/opt/bitnami/kafka/data] Found deletable segments with base offsets [1425] due to retention time 604800000ms breach (kafka.log.Log)
[2019-03-13 09:05:53,985] INFO [Log partition=topic-item-created-0, dir=/opt/bitnami/kafka/data] Scheduling log segment [baseOffset 1425, size 906290] for deletion. (kafka.log.Log)
[2019-03-13 09:05:53,986] INFO [Log partition=topic-item-created-0, dir=/opt/bitnami/kafka/data] Incrementing log start offset to 7310 (kafka.log.Log)
[2019-03-13 09:06:53,986] INFO [Log partition=topic-item-created-0, dir=/opt/bitnami/kafka/data] Deleting segment 1425 (kafka.log.Log)
[2019-03-13 09:06:53,988] INFO Deleted log /opt/bitnami/kafka/data/topic-item-created-0/00000000000000001425.log.deleted. (kafka.log.LogSegment)
[2019-03-13 09:06:53,988] INFO Deleted offset index /opt/bitnami/kafka/data/topic-item-created-0/00000000000000001425.index.deleted. (kafka.log.LogSegment)
[2019-03-13 09:06:53,988] INFO Deleted time index /opt/bitnami/kafka/data/topic-item-created-0/00000000000000001425.timeindex.deleted. (kafka.log.LogSegment)
[2019-03-13 09:10:24,530] INFO [GroupMetadataManager brokerId=1001] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2019-03-13 09:20:24,529] INFO [GroupMetadataManager brokerId=1001] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2019-03-13 09:30:24,529] INFO [GroupMetadataManager brokerId=1001] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2019-03-13 09:40:24,529] INFO [GroupMetadataManager brokerId=1001] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2019-03-13 09:50:24,529] INFO [GroupMetadataManager brokerId=1001] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2019-03-13 10:00:24,530] INFO [GroupMetadataManager brokerId=1001] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2019-03-13 10:10:24,529] INFO [GroupMetadataManager brokerId=1001] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2019-03-13 10:20:24,529] INFO [GroupMetadataManager brokerId=1001] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)