In an Akka streaming project we are seeing a very high volume of these error messages:
Error in stage [akka.stream.scaladsl.MergeHub$$anon$2$$anon$3]: Upstream producer failed with exception, removing from MergeHub now
Upon further investigation of the stack trace, what I have determined is that the in-port of the HTTP/2 connection was canceled (HTTP/2 terminology) abruptly and most likely from the other side. It is not clear to me if this is an HTTP/2 specific issue or if it is specific to gRPC, but I suspect the root cause is not related to Akka streams.
It is also very difficult to reproduce this issue locally, it only seems to occur in a higher environment with multiple replicas and involving a high volume of events.
The RPC producer has the following protocol buffer schema:
service EventService {
rpc publish (stream FooEvent) returns (Received);
}
And the producer itself is fairly simple, and the code below is called very frequently:
eventService
.publish()
.invoke(Source.single(someNewEvent))
The consuming side is also very straightforward and uses a MergeHub. While this is not strictly Akka gRPC code, it generates the above error message indicating a root cause related to either gRPC or HTTP/2
class EventServiceImpl[T](mergeHub: Sink[FooEvent, T])(implicit system: ActorSystem) extends EventServicePowerApi {
override def publish(in: Source[FooEvent, NotUsed], metadata: Metadata): Future[Received] = {
in runWith mergeHub
Future.successful(Received())
}
}
val eventService = new EventServiceImpl(MergeHub.source[FooEvent])
So once again my question is this: under what conditions would an HTTP/2 gRPC stream be cancelled? the error message above does not indicate the source of the cancellation, so it is necessary to determine the root cause inductively and through a process of elimination. I am wondering if there is a way to leverage this approach to identify the root cause, any thoughts?