A lot requests results in "response entity was not subscribed after ..."

Hi. I’ve already asked this question on StackOverflow, but nobody helps me. You can check it here. I will duplicate my problem here:

I have an akka-streams application, and there is a graph which includes a bunch of http calls. It works perfect for a while (10-20 minutes), and after some time fails with the next reason:

[LoaderActorSystem-akka.actor.default-dispatcher-54] [LoaderActorSystem/Pool(shared->https://chpp.hattrick.org:443)] [3 (WaitingForResponseEntitySubscription)]Response entity was not subscribed after 5 minutes. Make sure to read the response `entity` body or call `entity.discardBytes()` on it -- in case you deal with `HttpResponse`, use the shortcut `response.discardEntityBytes()`. GET /chppxml.ashx Empty -> 200 OK Default(18198 bytes)

As recommended here and at the documentation I’ve implemented parsing of the requests next way:

    val httpsFlow = Http().cachedHostConnectionPoolHttps[T](URL)

    val unmarshallFlow = Flow[(Try[HttpResponse], T)].mapAsync(2){
      case (Success(response), t) =>
        response.entity.dataBytes
          .runReduce(_ ++ _)
          .map(data => (data.utf8String, t))
    }
      .map(parse)

    httpsFlow.via(unmarshallFlow).async

So, before this error my app performs thousands of http calls, I can’t find the reason why the problems emerge.

Subscription timeout config is 5 minutes(changed it from 10 seconds to 1 hour, it doesn’t help):

akka.http.host-connection-pool.response-entity-subscription-timeout = 5.minute

Also changing the unmarshalling code to Unmarshal(result.entity).to[String] give no results also.

Hey… 2 weeks, no answer :(
Can anyone give me advice how can I debug or narrow down the problem?

Very hard to diagnose without knowing more about the app and what it does. Nothing obviously wrong with that sample at least (you could use toStrict instead of runReduce though).

Usually that exception is because of forgetting to consuming/discard all or some of the response entities, for example consuming it for OK responses but forgetting to discard for error status codes.

It could perhaps be caused by dispatcher/execution context starvation, if you do accidental blocking elsewhere so that the response handling stream reduce does not get to execute. I think that should subscribe but just be slow though. (Good overview of starvation in the Akka docs here: Dispatchers • Akka Documentation)

There’s also a real issue with the flow-based client APIs where incoming responses can be stuck in buffers because of backpressure and where it is hard to find good values for the timeout.

See these issues / comments for more information:

Here are two possible solutions:

  • Use mapAsync with singleRequest and instantly use toStrict to load the response into memory (if that is what you want / need)
  • Keep using stream-based APIs. If you are certain that all entities are read eventually, increase or disable the timeout. After all it’s thought to be a fallback mechanism that prevents complete stalling of a pool.

Regarding your code, what your flow-based approach roughly corresponds to is:

requests
  .mapAsyncUnordered(max-connections)(Http().singleRequest) // internal representation of flow-based API
  .mapAsync(2)(your processing)
  // further processing

mapAsync introduces buffers, so even without any data pulled from the sink, there can be max-connections+2 responses buffered for processing, for which only the 2 of your own mapAsync will subscribe and the others will timeout if the stream is backpressured. What you can do instead is

requests
  .mapAsync(whatEverYouNeed) { req =>
    Http().singleRequest(req).flatMap(yourProcessingThatReadsEntity)
  }
  // further processing

That will make sure that every response you receive will be processed immediately instead of being stuck in an internal buffer.

Hi, thanks for your answer.

  • Keep using stream-based APIs. If you are certain that all entities are read eventually, increase or disable the timeout. After all it’s thought to be a fallback mechanism that prevents complete stalling of a pool.

Unfortunately this doesn’t work for me. I had set up 1-hour timeout, and after a while my whole flow hangs again and throws an exception after an hour.

Also I’ve tried to use single-request flow, and this doesn’t help as well. The same stuff - flow fails with the same “Response entity was not subscribed after…” error.

It should be impossible, right? Seems that I’m doing something wrong.

If it hard to diagnose , I can provide the actual code , It’s here: here. I’ve commented there “old” flow-based api

So sad, that I can’t do anything with it. This problem prevents me from using Akka in my projects, although the project is almost done :frowning:

I even ready to pay a money for the help :)

You still use two mapAsync calls which will introduce a second buffer.

That said, if you have set a 1-hour timeout and you don’t subscribe to a response entity in one hour, something seems wrong with your stream processing. Can you make sure your stream is not stalled for some reason?