Handling upstream IOExceptions

How do I catch upstream exceptions such as java.io.IOException when returning responses?
I tried using CancellationStrategy.FailStage attribute but stage still completes successfully when using watchTermination?

// my.app.WebServer

Http()
      .newServerAt(config.server.host, config.server.port)
      .connectionSource()
      .log("connection", _.remoteAddress.toString)
      .addAttributes(Attributes.logLevels(onElement = Attributes.LogLevels.Debug,
                                          onFinish = Attributes.LogLevels.Debug,
                                          onFailure = Attributes.LogLevels.Error))
      .addAttributes(Attributes(CancellationStrategy(CancellationStrategy.FailStage)))
      .recover {
        case ex =>
          log.error("Connection error", ex)
          throw ex
      }
      .runForeach(_.handleWithAsyncHandler(route))

// my.app.Api

  private def respondWith(httpResponse: HttpResponse): Route = {
    complete {
      httpResponse
        .withEntity(HttpEntity(
          httpResponse.entity.contentType,
          httpResponse.entity.dataBytes
            .log("response")
            .addAttributes(Attributes.logLevels(onElement = Attributes.LogLevels.Off,
                                                onFinish = Attributes.LogLevels.Info,
                                                onFailure = Attributes.LogLevels.Error))
            .addAttributes(Attributes(CancellationStrategy(CancellationStrategy.FailStage)))
            .watchTermination()((_, future) =>
              future.onComplete {
                case Success(_)  => log.info("Response completed")
                case Failure(ex) => log.error("Response failed", ex)
            })
        ))
    }
  }

// Log

2021-12-13T08:11:55,346 DEBUG [63] akka.io.TcpListener: New connection accepted
2021-12-13T08:11:55,348 DEBUG [56] akka.stream.Materializer: [connection] Element: /0:0:0:0:0:0:0:1:51294
2021-12-13T08:11:55,353 DEBUG [56] ...
2021-12-13T08:11:57,675 DEBUG [52] akka.io.TcpIncomingConnection: Closing connection due to IO error java.io.IOException: Broken pipe
2021-12-13T08:11:57,911 INFO  [17] my.app.Api$: Response completed
2021-12-13T08:11:57,912 INFO  [17] akka.stream.Materializer: [response] Downstream finished, cause: SubscriptionWithCancelException$NoMoreElementsNeeded$: null

Hi @mpkfa,

yes, your observation is correct. The reason is that Akka HTTP 10.2 is still compatible with Akka 2.5 which doesn’t have a possibility to propagate a cause when a stream is cancelled.

I created Propagate connection errors to outstanding entity sources · Issue #3969 · akka/akka-http · GitHub to make sure we don’t forget about this when we finally drop Akka 2.5 support in 10.3.0.

For now, I would treat all cancellations in response entities as an error.

See also Handling on a server connections closed by clients · Issue #1718 · akka/akka-http · GitHub which seems to be slightly related.

Johannes

Hi @jrudolph
Thanks for the reply.

How would I actually treat all cancellations as errors?
My understanding was that CancellationStrategy.FailStage is used for this, and now you are telling me this does not work in current version of Akka HTTP?

I tried using server binding code from linked issues but again - the same result.

Currently akka-http, cannot distinguish between cancellation with error or without, so it just cancels.

watchTermination treats (regular) cancellation and completion the same way, so you might not be able to use that one. You might need a custom stage that waits for cancellation to do error handling to be able to distinguish it from regular completion.

Basically you need to treat the success case here as a failure:

        override def onDownstreamFinish(cause: Throwable): Unit = {
          cause match {
            case _: SubscriptionWithCancelException.NonFailureCancellation =>
              finishPromise.success(Done)
            case ex =>
              finishPromise.failure(ex)
          }
          cancelStage(cause)
        }
1 Like

Thanks a lot @jrudolph

The provided solution worked!

1 Like

Great to know!

@jrudolph I does not work in all cases though.
For smaller response payloads IOException is logged after response stream completes “successfully”.

I am using curl with --limit-rate to simulate slow client.

I managed to mitigate this by using low values for akka.http.server.socket-options.so-send-buffer-size but that reduces download speed a lot.

// my.app.WebServer

    Http()
      .newServerAt(server.host, server.port)
      .connectionSource()
      .runForeach(_.handleWithAsyncHandler(route, server.maxConnections))

// my.app.Api

  def respondWith(status: StatusCode, entity: HttpEntity, headers: Seq[HttpHeader]): Route =
    complete {
      HttpResponse(status)
        .withHeaders(headers)
        .withEntity(
          HttpEntity(
            contentType = entity.contentType,
            data = entity.dataBytes.via(new ResponseFlow())
          ))
    }

class ResponseFlow extends GraphStage[FlowShape[ByteString, ByteString]] {

  log.info("Returning response...")

  val in = Inlet[ByteString]("ResponseFlow.in")
  val out = Outlet[ByteString]("ResponseFlow.out")

  val shape = FlowShape.of(in, out)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {

    def success(): Unit = log.info("Response successful")

    def failure(ex: Throwable, result: String): Unit = log.error(s"Response failed: $result", ex)

    setHandler(
      in,
      new InHandler {
        override def onPush(): Unit = push(out, grab(in))

        override def onUpstreamFailure(ex: Throwable): Unit = {
          failure(ex, "upstream failure")
          super.onUpstreamFailure(ex)
        }

        override def onUpstreamFinish(): Unit = {
          success()
          super.onUpstreamFinish()
        }
      }
    )

    setHandler(
      out,
      new OutHandler {
        override def onPull(): Unit = pull(in)

        override def onDownstreamFinish(cause: Throwable): Unit = {
          failure(cause, "downstream failure")
          cancelStage(cause)
        }
      }
    )
  }
}

// Log

2021-12-20T13:55:30,463 INFO  [45] my.app.ResponseFlow: Returning response...
2021-12-20T13:55:30,546 INFO  [30] my.app.ResponseFlow: Response successful
2021-12-20T13:56:31,756 DEBUG [47] akka.io.TcpIncomingConnection: Closing connection due to IO error java.io.IOException: Connection reset by peer