Google Pub/Sub stops pulling randomly

Hello,

I follow the documentation to implement a system that subscribes to the http log flow on my kubernetes cluster and (will) send the json object to another system for analyses.

It works for a few items then it stops:

[DEBUG] [03/26/2019 10:44:48.682] [local-akka.actor.default-dispatcher-12] [local/Pool(shared->https://pubsub.googleapis.com:443)] [0 (Idle)] Dispatching request [POST /v1/projects/xxxxx-platform/subscriptions/yyyyyy:acknowledge Strict(360 bytes)]
[DEBUG] [03/26/2019 10:44:48.682] [local-akka.actor.default-dispatcher-12] [local/Pool(shared->https://pubsub.googleapis.com:443)] [0 (Idle)] Before event [onNewRequest] In state [Idle] for [0 ms]
[DEBUG] [03/26/2019 10:44:48.682] [local-akka.actor.default-dispatcher-12] [local/Pool(shared->https://pubsub.googleapis.com:443)] [0 (PushingRequestToConnection)] After event [onNewRequest] State change [Idle] -> [PushingRequestToConnection]
[DEBUG] [03/26/2019 10:44:48.682] [local-akka.actor.default-dispatcher-12] [local/Pool(shared->https://pubsub.googleapis.com:443)] [0 (PushingRequestToConnection)] Before event [onRequestDispatched] In state [PushingRequestToConnection] for [0 ms]
[DEBUG] [03/26/2019 10:44:48.682] [local-akka.actor.default-dispatcher-12] [local/Pool(shared->https://pubsub.googleapis.com:443)] [0 (WaitingForResponse)] After event [onRequestDispatched] State change [PushingRequestToConnection] -> [WaitingForResponse]
[DEBUG] [03/26/2019 10:44:48.715] [local-akka.actor.default-dispatcher-15] [local/Pool(shared->https://pubsub.googleapis.com:443)] [0 (WaitingForResponse)] Received response
[DEBUG] [03/26/2019 10:44:48.715] [local-akka.actor.default-dispatcher-15] [local/Pool(shared->https://pubsub.googleapis.com:443)] [0 (WaitingForResponse)] Before event [onResponseReceived] In state [WaitingForResponse] for [32 ms]
[DEBUG] [03/26/2019 10:44:48.715] [local-akka.actor.default-dispatcher-15] [local/Pool(shared->https://pubsub.googleapis.com:443)] [0 (WaitingForResponse)] onResponseReceived in WaitingForResponse with false
[DEBUG] [03/26/2019 10:44:48.715] [local-akka.actor.default-dispatcher-15] [local/Pool(shared->https://pubsub.googleapis.com:443)] [0 (WaitingForResponseDispatch)] After event [onResponseReceived] State change [WaitingForResponse] -> [WaitingForResponseDispatch]
[DEBUG] [03/26/2019 10:44:48.715] [local-akka.actor.default-dispatcher-15] [local/Pool(shared->https://pubsub.googleapis.com:443)] [0 (WaitingForResponseDispatch)] Before event [onResponseDispatchable] In state [WaitingForResponseDispatch] for [0 ms]
[DEBUG] [03/26/2019 10:44:48.715] [local-akka.actor.default-dispatcher-15] [local/Pool(shared->https://pubsub.googleapis.com:443)] [0 (WaitingForResponseEntitySubscription)] After event [onResponseDispatchable] State change [WaitingForResponseDispatch] -> [WaitingForResponseEntitySubscription]
[DEBUG] [03/26/2019 10:44:48.716] [local-akka.actor.default-dispatcher-5] [local/Pool(shared->https://pubsub.googleapis.com:443)] [0 (WaitingForResponseEntitySubscription)] Before event [onResponseEntitySubscribed] In state [WaitingForResponseEntitySubscription] for [0 ms]
[DEBUG] [03/26/2019 10:44:48.716] [local-akka.actor.default-dispatcher-5] [local/Pool(shared->https://pubsub.googleapis.com:443)] [0 (WaitingForEndOfResponseEntity)] After event [onResponseEntitySubscribed] State change [WaitingForResponseEntitySubscription] -> [WaitingForEndOfResponseEntity]
[DEBUG] [03/26/2019 10:44:48.716] [local-akka.actor.default-dispatcher-16] [local/Pool(shared->https://pubsub.googleapis.com:443)] [0 (WaitingForEndOfResponseEntity)] Before event [onResponseEntityCompleted] In state [WaitingForEndOfResponseEntity] for [0 ms]
[DEBUG] [03/26/2019 10:44:48.716] [local-akka.actor.default-dispatcher-16] [local/Pool(shared->https://pubsub.googleapis.com:443)] [0 (Idle)] After event [onResponseEntityCompleted] State change [WaitingForEndOfResponseEntity] -> [Idle]
[DEBUG] [03/26/2019 10:44:48.716] [local-akka.actor.default-dispatcher-16] [local/Pool(shared->https://pubsub.googleapis.com:443)] [0 (Idle)] Slot became idle... Trying to pull
[DEBUG] [03/26/2019 10:46:44.207] [local-akka.actor.default-dispatcher-16] [local/Pool(shared->https://www.googleapis.com:443)] Shutting down host connection pool immediately
[DEBUG] [03/26/2019 10:46:44.208] [local-akka.actor.default-dispatcher-12] [local/Pool(shared->https://www.googleapis.com:443)] Pool upstream was completed
[DEBUG] [03/26/2019 10:46:44.208] [local-akka.actor.default-dispatcher-12] [local/Pool(shared->https://www.googleapis.com:443)] Pool stopped
[DEBUG] [03/26/2019 10:46:46.745] [local-akka.actor.default-dispatcher-12] [akka://local/system/StreamSupervisor-1/flow-4-0-PoolFlow] Aborting tcp connection to pubsub.googleapis.com:443 because of upstream failure: akka.stream.scaladsl.TcpIdleTimeoutException: TCP idle-timeout encountered on connection to [pubsub.googleapis.com:443], no bytes passed in the last 2 minutes
[DEBUG] [03/26/2019 10:46:46.746] [local-akka.actor.default-dispatcher-16] [akka://local/system/StreamSupervisor-1/flow-7-1] closing output
[DEBUG] [03/26/2019 10:46:46.748] [local-akka.actor.default-dispatcher-12] [local/Pool(shared->https://pubsub.googleapis.com:443)] [1 (Idle)] Connection cancelled
[DEBUG] [03/26/2019 10:46:46.748] [local-akka.actor.default-dispatcher-12] [local/Pool(shared->https://pubsub.googleapis.com:443)] [1 (Idle)] Connection completed
[DEBUG] [03/26/2019 10:46:46.748] [local-akka.actor.default-dispatcher-12] [local/Pool(shared->https://pubsub.googleapis.com:443)] [1 (Idle)] Before event [onConnectionCompleted] In state [Idle] for [120901 ms]
[DEBUG] [03/26/2019 10:46:46.748] [local-akka.actor.default-dispatcher-12] [local/Pool(shared->https://pubsub.googleapis.com:443)] [1 (Unconnected)] After event [onConnectionCompleted] State change [Idle] -> [Unconnected]
[DEBUG] [03/26/2019 10:46:46.748] [local-akka.actor.default-dispatcher-12] [local/Pool(shared->https://pubsub.googleapis.com:443)] [1 (Unconnected)] State change from [Idle] to [Unconnected]. Closing the existing connection.
[DEBUG] [03/26/2019 10:46:48.735] [local-akka.actor.default-dispatcher-12] [local/Pool(shared->https://pubsub.googleapis.com:443)] Shutting down host connection pool immediately
[DEBUG] [03/26/2019 10:46:48.735] [local-akka.actor.default-dispatcher-5] [local/Pool(shared->https://pubsub.googleapis.com:443)] Pool upstream was completed
[DEBUG] [03/26/2019 10:46:48.735] [local-akka.actor.default-dispatcher-5] [local/Pool(shared->https://pubsub.googleapis.com:443)] Pool stopped

My implementations is very simple, and by the book :

  val subscriptionSource: Source[ReceivedMessage, NotUsed] = GooglePubSub.subscribe(subscription, config)

  val ackSink: Sink[AcknowledgeRequest, Future[Done]] = GooglePubSub.acknowledge(subscription, config)

  val processMessage: Sink[ReceivedMessage, NotUsed] = Flow[ReceivedMessage].map{ message =>
    read[GkeLogData](new String(java.util.Base64.getDecoder.decode(message.message.data)))
  }.to(Sink.foreach{ d =>
    println(Parser.anonymiseUuid(d.httpRequest.requestUrl))
  })

  val batchAckSink: Sink[ReceivedMessage, NotUsed] = Flow[ReceivedMessage]
    .map(_.ackId)
    .groupedWithin(5, 10 seconds)
    .map(AcknowledgeRequest.apply)
    .to(ackSink)

  val s: RunnableGraph[NotUsed] = subscriptionSource
    .alsoTo(batchAckSink)
    .to(processMessage).run()

I wonder if this is not something to do with non-discarded entities as the symptoms are the same, but I cannot be sure. Any ideas ?

Thank you for your help !

Swallowed exception in the processMessage visible by adding .log(“error”).

I assume that finding the exception helped you solve the problem.

Make it a habit to always have a stream completion future (eg. from Sink.ignore) and check it for proper error-free completion.

Cheers,
Enno.