gRPC Stream Cancellation, under what conditions?

In an Akka streaming project we are seeing a very high volume of these error messages:

Error in stage [akka.stream.scaladsl.MergeHub$$anon$2$$anon$3]: Upstream producer failed with exception, removing from MergeHub now

Upon further investigation of the stack trace, what I have determined is that the in-port of the HTTP/2 connection was canceled (HTTP/2 terminology) abruptly and most likely from the other side. It is not clear to me if this is an HTTP/2 specific issue or if it is specific to gRPC, but I suspect the root cause is not related to Akka streams.

It is also very difficult to reproduce this issue locally, it only seems to occur in a higher environment with multiple replicas and involving a high volume of events.

The RPC producer has the following protocol buffer schema:

service EventService {
  rpc publish (stream FooEvent) returns (Received);
}

And the producer itself is fairly simple, and the code below is called very frequently:

eventService
  .publish()
  .invoke(Source.single(someNewEvent))

The consuming side is also very straightforward and uses a MergeHub. While this is not strictly Akka gRPC code, it generates the above error message indicating a root cause related to either gRPC or HTTP/2

class EventServiceImpl[T](mergeHub: Sink[FooEvent, T])(implicit system: ActorSystem) extends EventServicePowerApi {
  override def publish(in: Source[FooEvent, NotUsed], metadata: Metadata): Future[Received] = {
    in runWith mergeHub
    Future.successful(Received())
  }
}

val eventService = new EventServiceImpl(MergeHub.source[FooEvent])

So once again my question is this: under what conditions would an HTTP/2 gRPC stream be cancelled? the error message above does not indicate the source of the cancellation, so it is necessary to determine the root cause inductively and through a process of elimination. I am wondering if there is a way to leverage this approach to identify the root cause, any thoughts?

There is something weird in your method implementation no? Shouldn’t your future response be a mapping on the future completion of the stream ? By immediately returning a future back, aren’t you signaling the end already ?

I have since modified the graph to experiment and troubleshoot, I am still running into the same trouble though. It does appear to me to be some kind of race condition:

Source
     .queue[PersonaEvent](queueSize, OverflowStrategy.backpressure)
     .groupedWithin(bufferFrame.size, bufferFrame.period)
     .map(
        Source(_)
     )
     .throttle(throttleFrame.size, throttleFrame.period, 4, ThrottleMode.shaping)
     .mapAsyncUnordered(throttleFrame.size) { eventSource =>
        logger.debug("Pushing source {}", eventSource)
        eventService
           .publish()
           .invoke(eventSource)
           .map(_ => eventSource)
     }
     .to(Sink.ignore)
     .run()

The use of throttle seems to help, but only with unreasonably low thru-put (a few elements per second at most). I’ve also removed the groupedWithin but that didn’t seem to mitigate the MergeHub failures.

I may be completely wrong here but - as already explained in my first reply - your proto file defines a service where the client sends a stream and the server sends a single response.

Hence, my surprise : why are you immediately returning a response in your server implementation : shouldn’t you wait until the stream finishes before sending the response back ?

If the remote abruptly stops, isn’t that an indication you might be triggering that behavior by immediately sending the response from the server, as that may indicate to the client that the server signals the processing of the stream has ended. ?

Thank you for responding to me, I did not write the original implementation upon which this proof of concept test is based, so I do not even understand why it is necessary to send an acknowledgement reply. I think it is a gRPC requirement to send a reply though, I don’t think you can model an RPC endpoint with no return type, I could be wrong about that.

Why not generate a promise and pass that into the merge hub and map over its future to return the response ? Then when you consider the processing done somewhere inside your mergehub complete the future accordingly : that way you do not have this disconnected reply that may come too early ? You are throttling yes, but you are not continuing after the item was processed but immediately after plumbing into the mergehub.

Do I even need the MergeHub? I took your suggestion and came-up with this, which has me wondering what the original purpose of the MergeHub was in the first place:

    in runWith Sink.foldAsync(Received()) {
      case (received, event) =>
        process(event).map { _ =>
           Received(received.count + 1)
        }
    }

I no longer have the original errors but then again it’s not using a MergeHub anymore either. The above code process(event) would do some asynchronous processing effectively returning a Future[U].

Actually this seems to solve the issue. I am still using the MergeHub here but I am no longer getting the “MergeHub shutting down” errors either.

    (in.watchTermination() {
      case (_, terminationFuture) =>
        terminationFuture.map(_ => Received())
    })
      .toMat(sink)(Keep.left)
      .run()

Thank you for pointing that out, we’ve been stumped by this issue for several months! (yes, since last year). The lesson learned here is that sending a gRPC reply right away effectively shuts down the stream before the MergeHub can drain it.

1 Like