Process request without consuming request entity

I have an akka http server application with couple of routes where user can upload data (10G+). Some routes should give conflict if an object already exists. Naturally we don`t want to consume full request entity if we know upfront that request is invalid.

So one of our routes looks like this:

val validRequest = ...
if (!validRequest) {
  request.discardEntityBytes()
  complete(Conflict)
} else 
 process(request)

Everything works fine if i query server with curl directly, in production our server is behind nginx.
When i execute same curl request against nginx sometimes (not very often) nginx responds with 502 Gateway Timeout and logs following error:

> PUT /data HTTP/1.1
> User-Agent: curl/7.29.0
> Host: localhost:8081
> Accept: */*
> Expect: 100-continue
> Content-Type: application/octet-stream
> Content-Length: 209715200
>
< HTTP/1.1 100 Continue
< HTTP/1.1 502 Bad Gateway
< Date: Tue, 10 Sep 2019 15:06:44 GMT
< Content-Type: text/html
< Content-Length: 150
< Connection: keep-alive
* HTTP error before end of send, stop sending
<
<html>
<head><title>502 Bad Gateway</title></head>
<body>
<center><h1>502 Bad Gateway</h1></center>
<hr><center>nginx</center>
</body>
</html>
* Closing connection 0

2019/09/10 14:40:15 [error] 2775#0: *346 writev() failed (32: Broken pipe) while sending request to upstream, client: 127.0.0.1, server: localhost, request: "PUT /data HTTP/1.1", upstream: "http://100.64.162.207:31000/data", host: "localhost:8081"

Looking at akka http logic around handling 100 Continue response i see in HttpServerBluePrint.scala:

 Only when (and if) the application then requests data from the entity stream do we send out a `100 Continue` response and continue reading the request entity. The application can therefore determine itself whether it wants the client to send the request entity by deciding whether to look at the request entity data stream or not. If the application sends a response *without* having looked at the request entity the client receives this response *instead of* the `100 Continue` response and the server closes the connection afterwards.

So it looks to me like a race condition between discardEntityBytes (which will access entity bytes) and complete of request with final conflict status code. Seems like on nginx side conflict response received while it was writing data to a socket.

So I have a couple of questions:

  • Is it a bug in akka http that response can be written while downstream is writing data causing unexpected behavior?
  • I can see that discard request entity was added to default ExceptionHandler and RejectionHandler in akka http, but what would be consequences of not releasing request entity? In such cases connection would be anyway closed after response is rendered so I assume all resources can still be released automatically.
  • As workaround would it make sense to schedule discard of request entity after some delay so that complete of a request will always come first?

Regards,
Kyrylo

Hi @kstokoz,

thanks for that report. I haven’t more deeply looked into it yet. discardEntityBytes() uses Sink.ignore so it will still read all the entity (somewhat counter-intuitively). Can you try entity.dataBytes.runWith(Sink.cancelled) instead?

100-continue support isn’t super well-exercised so there might be bugs lurking and there might indeed be a race condition as you suggest.

Broken pipe means, that the TCP connection was aborted. Hard to say without akka-http logs why akka-http would have aborted the connection.

If it’s not done and the connection is not closed afterwards it might just stall because the next request is never read.

So far, it’s not quite clear to me how those things relate to each other exactly. If it’s possible to reproduce could you enable debug logging and maybe even akka.http.server.log-unencrypted-network-bytes = 100 to see what’s going on? If the connection is unencrypted behind nginx a tcpdump might also help.

Johannes

1 Like

Hi @jrudolph,

Thanks for the feedback.

I followed your suggestion and took a tcpdump on backend node which is running akka-http server and i was suprised to see that tcp sequences on backend looks the same in either successful cases (properly returning 409) and on failure cases (nginx failing with 502). Normally it is a 409 Conflict response followed by RST packet.

I took tcp dump on nginx node so i can record tcp packets in both directions from client to nginx and from nginx to backend.

In screenshots below
100.126.33.6 (client up) where curl request to upload data originates from
100.120.147.75 - nginx node, nginx is running on 8081 port
100.66.254.132 - backend, akka http server running on port 31000

It looks like if nginx receive RST packet before it forwarded 409 response to the client it fails with 502:

If RST packet comes after the response is forwarded to the client RST packet is ignored:

And sometimes everything completes normally without RST:

Now i`m a bit puzzled on which side is actually an issue here.

Maybe you have any observations/suggestion where to look further?

Regards,
Kyrylo

Hi Kyrylo,

usually, there shouldn’t be any RSTs.

It seems there’s some TCP Segmentation Offload going on since the TCP packets are larger than common MTUs. That makes the dump less useful as it could be since some TCP logic already ran on your network interface. You could try to disable TSO to get more accurate tcpdumps (e.g. see here).

The best for us would be if we could reproduce the behavior in isolation. Could you check if you get a similar result (RST from akka-http) when running without nginx, even if the client doesn’t complain? Which versions do you use?

Johannes

Hi @jrudolph

I ran with akka http 10.1.8 and akka 2.5.23.

RST packet is always happeing, i created a small reproducer:

import akka.Done
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.StatusCodes
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.ExceptionHandler
import akka.stream.ActorMaterializer
import com.here.tofubox.utils.HttpUtils

import scala.concurrent.Future

object WebServer {

  implicit val system = ActorSystem("my-system")
  implicit val materializer = ActorMaterializer()
  implicit val executionContext = system.dispatcher

  def main(args: Array[String]): Unit = {
    val bindingFuture = Http().bindAndHandle(router.route, "localhost", 8080)

    println(
      s"Server online at http://localhost:8080/\nPress RETURN to stop...")
    System.in.read()

    bindingFuture
      .flatMap(_.unbind()) // trigger unbinding from the port
      .onComplete(_ => system.terminate()) // and shutdown when done
  }
}

object router {
  implicit val executionContext = WebServer.executionContext
  implicit val mat = WebServer.materializer

  val exceptionHandler: ExceptionHandler = ExceptionHandler {
    case _: RuntimeException =>
      complete(StatusCodes.Conflict)
  }

  val route =
    path("data") {
      handleExceptions(exceptionHandler) {
        put {
          extractRequest { request =>
            HttpUtils.discardEntityBytes(request.entity)
            val response: Future[Done] = Future.failed(new RuntimeException("It is not allowed to overwrite an existing object"))
            complete(response)
          }
        }
      }
    }
}

and while true; do curl -vv -X PUT -H "Content-Type: application/octet-stream" "http://localhost:8080/data" --data-binary @100MB.dat; sleep 1; done

Hope it helps further.

Regards,
Kyrylo

That’s great, thanks!

Created https://github.com/akka/akka-http/issues/2709.

It didn’t happen for me so far for the versions I checked. What is the implementation of HttpUtils.discardEntityBytes(request.entity)?

For completeness HttpUtils.discardEntityBytes(request.entity) is

 final def discardEntityBytes(entity: HttpEntity)
                              (implicit materializer: Materializer): Future[Done] =
    entity match {
      case _: HttpEntity.Strict  =>
        Future.successful(Done)

      case e if e.isKnownEmpty() =>
        Future.successful(Done)

      case e                     =>
        e.discardBytes().future()
    }

Also i`m running on openjdk-11.0.1.jdk if it is changing something.

I still cannot reproduce in a stand alone project. Can you try https://github.com/jrudolph/akka-http-2907-reproducer and see what changes are necessary to show the behavior?

The only change i needed to do is add bind to the end of the Test.scala.

Http().bindAndHandle(route, "localhost", 8080)

I started the app, curl and wireshark and i see same picture:

In app logs i see only:

DEBUG] [09/12/2019 17:09:37.780] [main] [EventStream(akka://default)] logger log1-Logging$DefaultLogger started
[DEBUG] [09/12/2019 17:09:37.782] [main] [EventStream(akka://default)] Default Loggers started
[DEBUG] [09/12/2019 17:09:37.993] [main] [AkkaSSLConfig(akka://default)] Initializing AkkaSSLConfig extension...
[DEBUG] [09/12/2019 17:09:37.994] [main] [AkkaSSLConfig(akka://default)] buildHostnameVerifier: created hostname verifier: com.typesafe.sslconfig.ssl.NoopHostnameVerifier@3f4faf53
[DEBUG] [09/12/2019 17:09:38.721] [default-akka.actor.default-dispatcher-4] [akka://default/system/IO-TCP/selectors/$a/0] Successfully bound to /127.0.0.1:8080
[DEBUG] [09/12/2019 17:09:52.151] [default-akka.actor.default-dispatcher-4] [akka://default/system/IO-TCP/selectors/$a/0] New connection accepted
[DEBUG] [09/12/2019 17:09:54.701] [default-akka.actor.default-dispatcher-2] [akka://default/system/IO-TCP/selectors/$a/0] New connection accepted
[DEBUG] [09/12/2019 17:09:56.948] [default-akka.actor.default-dispatcher-3] [akka://default/system/IO-TCP/selectors/$a/0] New connection accepted
[DEBUG] [09/12/2019 17:09:59.108] [default-akka.actor.default-dispatcher-4] [akka://default/system/IO-TCP/selectors/$a/0] New connection accepted
[DEBUG] [09/12/2019 17:10:01.275] [default-akka.actor.default-dispatcher-2] [akka://default/system/IO-TCP/selectors/$a/0] New connection accepted
[DEBUG] [09/12/2019 17:10:03.424] [default-akka.actor.default-dispatcher-2] [akka://default/system/IO-TCP/selectors/$a/0] New connection accepted
[DEBUG] [09/12/2019 17:10:05.571] [default-akka.actor.default-dispatcher-4] [akka://default/system/IO-TCP/selectors/$a/0] New connection accepted
[DEBUG] [09/12/2019 17:10:07.748] [default-akka.actor.default-dispatcher-6] [akka://default/system/IO-TCP/selectors/$a/0] New connection accepted
[DEBUG] [09/12/2019 17:10:09.924] [default-akka.actor.default-dispatcher-2] [akka://default/system/IO-TCP/selectors/$a/0] New connection accepted
[DEBUG] [09/12/2019 17:10:12.079] [default-akka.actor.default-dispatcher-2] [akka://default/system/IO-TCP/selectors/$a/0] New connection accepted
[DEBUG] [09/12/2019 17:10:14.244] [default-akka.actor.default-dispatcher-6] [akka://default/system/IO-TCP/selectors/$a/0] New connection accepted
[DEBUG] [09/12/2019 17:10:16.399] [default-akka.actor.default-dispatcher-5] [akka://default/system/IO-TCP/selectors/$a/0] New connection accepted
[DEBUG] [09/12/2019 17:10:18.578] [default-akka.actor.default-dispatcher-4] [akka://default/system/IO-TCP/selectors/$a/0] New connection accepted
[DEBUG] [09/12/2019 17:10:20.752] [default-akka.actor.default-dispatcher-6] [akka://default/system/IO-TCP/selectors/$a/0] New connection accepted
[DEBUG] [09/12/2019 17:10:22.899] [default-akka.actor.default-dispatcher-2] [akka://default/system/IO-TCP/selectors/$a/0] New connection accepted
[DEBUG] [09/12/2019 17:10:25.065] [default-akka.actor.default-dispatcher-4] [akka://default/system/IO-TCP/selectors/$a/0] New connection accepted

Ohh wait, i will check again.

I think RST i see now is actually curl trying different IPs:

*   Trying ::1...
* TCP_NODELAY set
* Connection failed
* connect to ::1 port 8080 failed: Connection refused
*   Trying 127.0.0.1...
* TCP_NODELAY set
* Connected to localhost (127.0.0.1) port 8080 (#0)
> PUT /data HTTP/1.1
> Host: localhost:8080
> User-Agent: Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Trident/5.0)
> Accept: */*
> Referer:
> Content-Type: application/octet-stream
> Content-Length: 2097152000
> Expect: 100-continue
>
< HTTP/1.1 409 Conflict
< Server: akka-http/10.1.8
< Date: Thu, 12 Sep 2019 15:17:02 GMT
< Connection: close
< Content-Type: text/plain; charset=UTF-8
< Content-Length: 96

I will take a look once again

Hi @jrudolph,

I spend some time trying to get reproducer for RST packets issue.

One option where RST is triggered consistently is like this:

object TestRSTApp extends App {
  implicit val system = ActorSystem()
  implicit val executionContext = system.dispatcher
  implicit val mat = ActorMaterializer()

  import akka.http.scaladsl.server.Directives._

  val exceptionHandler: ExceptionHandler = ExceptionHandler {
    case ex: RuntimeException =>
      println(ex.getMessage)
      complete(StatusCodes.Conflict)
  }

  val route =
    path("data") {
      handleExceptions(exceptionHandler) {
        put {
          extractRequest { request =>
            discardEntityBytes(request.entity)
            val promise = Promise[String]()

            system.scheduler.scheduleOnce(1000.millis) {
              promise.failure(new RuntimeException("It is not allowed to overwrite an existing object"))
            }

            complete(promise.future)
          }
        }
      }
    }

   final def discardEntityBytes(entity: HttpEntity)
                              (implicit materializer: Materializer): Future[Done] =
      entity match {
        case _: HttpEntity.Strict =>
          Future.successful(Done)

        case e if e.isKnownEmpty() =>
          Future.successful(Done)

        case e =>
          e.discardBytes().future()
      }

  Http().bindAndHandle(route, "127.0.0.1", 31000)

}

and executing curl with payload bigger that default accepted of 8Mb:
while true; do curl -vv -X PUT -H "Content-Type: application/octet-stream" "http://127.0.0.1:31000/data" --data-binary @output-10M.dat; sleep 1; done

In curl output:

*   Trying 127.0.0.1...
* TCP_NODELAY set
* Connected to 127.0.0.1 (127.0.0.1) port 31000 (#0)
> PUT /data HTTP/1.1
> Host: 127.0.0.1:31000
> User-Agent: Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Trident/5.0)
> Accept: */*
> Referer:
> Content-Type: application/octet-stream
> Content-Length: 10485760
> Expect: 100-continue
>
* Done waiting for 100-continue
< HTTP/1.1 409 Conflict
< Server: akka-http/10.1.8
< Date: Fri, 13 Sep 2019 11:15:34 GMT
< Connection: close
< Content-Type: text/plain; charset=UTF-8
< Content-Length: 96
<
* we are done reading and this is set to close, stop send
* Closing connection 0
The request could not be processed because of conflict in the request, such as an edit conflict.

I still think that it is a bit different RST that i see in the original case as there is no 100 Continue in this case. If additionally add akka.http.server.parsing.max-content-length = infinite then 100 Continue will be rendered and no RST packet in tcpdump either.

I will try to isolate an original issue further i can reproduce it within my app quite often. Maybe there is some tracing logs i can enable so it would help localize the issue further?

Regards,
Kyrylo

Could not find how to attach file here, so i added stream debug output to guthub issue: https://github.com/akka/akka-http/issues/2709#issuecomment-531223135