Relentless "Upstream producer failed with exception, removing from MergeHub now" failures

I’ve been troubleshooting a problem over the past several months (off and on) that has me completely flummoxed. I’ve setup a simple repro case: a gRPC emitter and a gRPC consumer. The emitter simulates firing events and the consumer simulates slow processing of the events. Periodically the HTTP/2 stream is cancelled without any additional context or reasoning why this was so. I am unable to solve this issue because the lack of error messaging and handling prevents me from carefully diagnosing the root cause.

Here is the code for the emitter (fired repeatedly over gRPC):

def publish(event: FooEvent): IO[Unit] = {
    IO.fromFuture {
      IO {
        eventService
          .publish()
          .invoke(
            Source
              .single(event)
              .log("before-dispatch")
              .withAttributes(
                Attributes
                  .logLevels(
                    onElement = Logging.WarningLevel,
                    onFinish = Logging.InfoLevel,
                    onFailure = Logging.DebugLevel
                  )
              )
          )
      }
    } map (_ => ())

Protobuf service definition:

service EventService {
  rpc publish (stream FooEvent) returns (Received);
}

The sink:

  val sink: Sink[GrpcFooEvent, NotUsed] =
    (MergeHub.source[GrpcFooEvent] named "Test Sink" async) to {
      Sink.foreach { _ =>
        Thread.sleep(5000)
      }
    } run

  override def publish(
      in: Source[FooEvent, NotUsed],
      metadata: Metadata
  ): Future[Received] = {
    in runWith sink
    Future.successful(Received())
  }

And to generate traffic I’m running a Monad[IO].whileM_ ... for about 25 seconds that fires the publish method.

Here is the error I am seeing periodically:

[ERROR] [04/20/2022 11:00:05.045] [foo-events-akka.actor.default-dispatcher-7] [akka://foo-events/system/Materializers/StreamSupervisor-0/flow-732-0-unnamed] Error in stage [akka.stream.scaladsl.MergeHub$$anon$2$$anon$3]: Upstream producer failed with exception, removing from MergeHub now
akka.stream.scaladsl.MergeHub$ProducerFailed: Upstream producer failed with exception, removing from MergeHub now
        at akka.stream.scaladsl.MergeHub$$anon$2$$anon$3.onUpstreamFailure(Hub.scala:352)
        at akka.stream.impl.fusing.GraphInterpreter.processEvent(GraphInterpreter.scala:525)
        at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:390)
        at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:650)
        at akka.stream.impl.fusing.GraphInterpreterShell$AsyncInput.execute(ActorGraphInterpreter.scala:521)
        at akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:625)
        at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:800)
        at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:818)
        at akka.actor.Actor.aroundReceive(Actor.scala:537)
        at akka.actor.Actor.aroundReceive$(Actor.scala:535)
        at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:716)
        at akka.actor.ActorCell.receiveMessage$$$capture(ActorCell.scala:580)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala)
        at akka.actor.ActorCell.invoke(ActorCell.scala:548)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
        at akka.dispatch.Mailbox.run(Mailbox.scala:231)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
        at java.base/java.util.concurrent.ForkJoinTask.doExec$$$capture(ForkJoinTask.java:295)
        at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java)
        at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1016)
        at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1665)
        at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1598)
        at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
Caused by: akka.http.scaladsl.model.http2.PeerClosedStreamException: Stream with ID [731] was closed by peer with code CANCEL(0x08)

What can I do to properly / carefully isolate the root cause of these errors?

Hi MouthySpider,

I’m not following both sides of the communication. Are you using Akka gRPC? I really like the example of using the Akka generated client. For example, in my blog post, there is an example of streaming samples from a file to the gRPC endpoint here, which uses a bi-directional stream.

In re to the sink, I absolutely HATE the Thread.sleep… you should never block a thread, maybe use another actor with a scheduler, and do an ask to make sure you’re not blocking.

Another thing you can do is to wrap your source with a retry / back off as described here. BTW, the whole page covers error handling with Streams.

Hi MouthySpider,

I decided to go ahead and create a TimedResponseActor using the Akka Typed API, which you can see here:

package com.mread.timers

import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ActorRef, Behavior}

import scala.concurrent.duration.DurationInt
import scala.util.Random

object TimedResponseActor {
  sealed trait Request
  final case class Ping(whom: String, replyTo: ActorRef[Response]) extends Request
  final case class Complete(whom: String, replyTo: ActorRef[Response]) extends Request
  final case class Stop() extends Request

  sealed trait Response
  final case class Received(whom: String) extends Response

  // min and max used to randomly pick a schedule time
  def apply(minInclusiveSeconds: Int, maxExclusiveSeconds: Int): Behavior[Request] = Behaviors.withTimers { timers =>
    Behaviors.setup { context =>

      val r = new Random()

      Behaviors.receiveMessage {
        case Ping(whom, replyTo) =>
          val scheduleDuration = r.between(minInclusiveSeconds, maxExclusiveSeconds).seconds
          timers.startSingleTimer(Complete(whom, replyTo), scheduleDuration)
          context.log.info(s"Ping received and scheduled for $whom for $scheduleDuration seconds")
          Behaviors.same
        case Complete(whom, replyTo) =>
          replyTo ! Received(whom)
          context.log.info(s"Complete received and sent for $whom")
          Behaviors.same
        case Stop () =>
          context.log.info(s"Stop received so shutting down.")
          Behaviors.stopped
        case _ =>
          Behaviors.same
      }
    }
  }

}

Next I created two streaming tests, the first maintains response element order using mapAsync. The second, I wrapped the source with RestartSource.withBackoff, and use the mapAsyncUnordered to increase throughput. I found it fun to play around with various restart settings. Note: the second test will usually fail as is, but you can play around with the number of restarts within a time frame to see if you can get it to pass. Enjoy.

package com.mread

import akka.actor.PoisonPill
import akka.actor.testkit.typed.scaladsl.ActorTestKit
import akka.actor.typed._
import akka.actor.typed.scaladsl.AskPattern._
import akka.stream.RestartSettings
import akka.stream.scaladsl.{RestartSource, Source}
import akka.util.Timeout
import com.mread.timers.TimedResponseActor
import com.mread.timers.TimedResponseActor.{Ping, Response, Stop}
import org.scalatest.BeforeAndAfterAll
import org.scalatest.flatspec.AsyncFlatSpec
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec
import org.slf4j.LoggerFactory

import scala.concurrent.Await
import scala.concurrent.duration._

class TimedResponseActorSpec extends AnyWordSpec
  with BeforeAndAfterAll
  with Matchers {

  private val log = LoggerFactory.getLogger(getClass)

  val testKit = ActorTestKit()

  override def afterAll(): Unit = {
    log.info("Shutting down.")
    testKit.shutdownTestKit()
  }

  "Testing Akka Streams with Akka Typed Actors" must {
    "support mapAsync and verify responses" in {

      // random schedule 5 - 8 seconds, should be less than timeout below
      log.info("creating timedResponseActor with random schedule minInclusiveSeconds=5, maxExclusiveSeconds=8")
      val timedResponseActor = testKit.spawn(TimedResponseActor(5, 8), "TimedResponse")

      implicit val timeout: Timeout = 10.seconds
      implicit val system = testKit.system

      val future = Source(1 to 10)
        // if you don't need to keep element order you can increase throughput with mapAsyncUnordered
        .mapAsync(5) { i =>
          // timeout in the ask causes the stream to fail
          timedResponseActor.ask { ref : ActorRef[Response] =>
            Ping(i.toString, ref)
          }
        }
        .runForeach(resp => log.info(s"response received ${resp}"))
      val result = Await.result(future, 1000.minutes)
      log.info(result.toString)
      assert(result.toString == "Done")

      timedResponseActor ! Stop()
    }
  }

  "Testing Akka Streams with Akka Typed Actors" must {
    "support mapAsyncUnordered and restart with backoff on timeouts" in {

      // random schedule 5 - 11 seconds which is more than the timeout
      log.info("creating timedResponseActor with random schedule minInclusiveSeconds=5, maxExclusiveSeconds=11")
      val timedResponseActor = testKit.spawn(TimedResponseActor(5, 11), "TimedResponse")

      implicit val timeout: Timeout = 10.seconds
      implicit val system = testKit.system

      val settings = RestartSettings(
        minBackoff = 3.seconds,
        maxBackoff = 10.seconds,
        randomFactor = 0.2 // adds 20% "noise" to vary the intervals slightly
      ).withMaxRestarts(4, 3.minutes) // limits the amount of restarts to 3 within 3 minutes

      val restartSource = RestartSource.withBackoff(settings) { () =>
        Source(1 to 10)
          // if you need to maintain element order using mapAsync instead
          .mapAsyncUnordered(5) { i =>
            // timeout in the ask causes the stream to fail
            timedResponseActor.ask { ref: ActorRef[Response] =>
              Ping(i.toString, ref)
            }
          }
      }
     .runForeach(resp => log.info(s"response received ${resp}"))

      val result = Await.result(restartSource, 1000.minutes)
      log.info(result.toString)
      assert(result.toString == "Done")

      timedResponseActor ! Stop()
    }
  }

}

the build.sbt looks like this:

name := "ScalaPlayII"

val akkaVersion = "2.6.19"
lazy val scalaTestVersion = "3.2.9"
lazy val logbackVersion  = "1.2.3"

ThisBuild / scalaVersion := "2.13.8"
ThisBuild / organization := "com.mread.scalaplay"

lazy val root = (project in file("."))
  .settings(
    libraryDependencies ++= Seq(

      "com.typesafe.akka" %% "akka-actor-typed" % akkaVersion,
      "com.typesafe.akka" %% "akka-stream" % akkaVersion,

      //Logback
      "com.typesafe.akka" %% "akka-slf4j" % akkaVersion,
      "ch.qos.logback" % "logback-classic" % logbackVersion,

      "com.typesafe.akka" %% "akka-stream-testkit" % akkaVersion % Test,
      "com.typesafe.akka" %% "akka-actor-testkit-typed" % akkaVersion % Test,
      "org.scalatest" %% "scalatest" % scalaTestVersion% Test

    )
  )

This has been resolved, see here