A lot requests results in "response entity was not subscribed after ..."

Hi. I’ve already asked this question on StackOverflow, but nobody helps me. You can check it here. I will duplicate my problem here:

I have an akka-streams application, and there is a graph which includes a bunch of http calls. It works perfect for a while (10-20 minutes), and after some time fails with the next reason:

[LoaderActorSystem-akka.actor.default-dispatcher-54] [LoaderActorSystem/Pool(shared->https://chpp.hattrick.org:443)] [3 (WaitingForResponseEntitySubscription)]Response entity was not subscribed after 5 minutes. Make sure to read the response `entity` body or call `entity.discardBytes()` on it -- in case you deal with `HttpResponse`, use the shortcut `response.discardEntityBytes()`. GET /chppxml.ashx Empty -> 200 OK Default(18198 bytes)

As recommended here and at the documentation I’ve implemented parsing of the requests next way:

    val httpsFlow = Http().cachedHostConnectionPoolHttps[T](URL)

    val unmarshallFlow = Flow[(Try[HttpResponse], T)].mapAsync(2){
      case (Success(response), t) =>
        response.entity.dataBytes
          .runReduce(_ ++ _)
          .map(data => (data.utf8String, t))
    }
      .map(parse)

    httpsFlow.via(unmarshallFlow).async

So, before this error my app performs thousands of http calls, I can’t find the reason why the problems emerge.

Subscription timeout config is 5 minutes(changed it from 10 seconds to 1 hour, it doesn’t help):

akka.http.host-connection-pool.response-entity-subscription-timeout = 5.minute

Also changing the unmarshalling code to Unmarshal(result.entity).to[String] give no results also.

Hey… 2 weeks, no answer :(
Can anyone give me advice how can I debug or narrow down the problem?

Very hard to diagnose without knowing more about the app and what it does. Nothing obviously wrong with that sample at least (you could use toStrict instead of runReduce though).

Usually that exception is because of forgetting to consuming/discard all or some of the response entities, for example consuming it for OK responses but forgetting to discard for error status codes.

It could perhaps be caused by dispatcher/execution context starvation, if you do accidental blocking elsewhere so that the response handling stream reduce does not get to execute. I think that should subscribe but just be slow though. (Good overview of starvation in the Akka docs here: Dispatchers • Akka Documentation)

There’s also a real issue with the flow-based client APIs where incoming responses can be stuck in buffers because of backpressure and where it is hard to find good values for the timeout.

See these issues / comments for more information:

Here are two possible solutions:

  • Use mapAsync with singleRequest and instantly use toStrict to load the response into memory (if that is what you want / need)
  • Keep using stream-based APIs. If you are certain that all entities are read eventually, increase or disable the timeout. After all it’s thought to be a fallback mechanism that prevents complete stalling of a pool.

Regarding your code, what your flow-based approach roughly corresponds to is:

requests
  .mapAsyncUnordered(max-connections)(Http().singleRequest) // internal representation of flow-based API
  .mapAsync(2)(your processing)
  // further processing

mapAsync introduces buffers, so even without any data pulled from the sink, there can be max-connections+2 responses buffered for processing, for which only the 2 of your own mapAsync will subscribe and the others will timeout if the stream is backpressured. What you can do instead is

requests
  .mapAsync(whatEverYouNeed) { req =>
    Http().singleRequest(req).flatMap(yourProcessingThatReadsEntity)
  }
  // further processing

That will make sure that every response you receive will be processed immediately instead of being stuck in an internal buffer.

Hi, thanks for your answer.

  • Keep using stream-based APIs. If you are certain that all entities are read eventually, increase or disable the timeout. After all it’s thought to be a fallback mechanism that prevents complete stalling of a pool.

Unfortunately this doesn’t work for me. I had set up 1-hour timeout, and after a while my whole flow hangs again and throws an exception after an hour.

Also I’ve tried to use single-request flow, and this doesn’t help as well. The same stuff - flow fails with the same “Response entity was not subscribed after…” error.

It should be impossible, right? Seems that I’m doing something wrong.

If it hard to diagnose , I can provide the actual code , It’s here: here. I’ve commented there “old” flow-based api

So sad, that I can’t do anything with it. This problem prevents me from using Akka in my projects, although the project is almost done :frowning:

I even ready to pay a money for the help :)

You still use two mapAsync calls which will introduce a second buffer.

That said, if you have set a 1-hour timeout and you don’t subscribe to a response entity in one hour, something seems wrong with your stream processing. Can you make sure your stream is not stalled for some reason?

I’ve come across this issue too, or something very like it.

I had a downstream bug where my final Sink.actorRef wasn’t sending acks correctly, so nothing was actually getting to it.

But the flow was able to process around 18 HTTP requests before it crashed with the ‘not subscribed after 1 second’ message.

I can fix the downstream bug and I’ll probably never see the error again, but it raises the worry that any downstream delay that happens to occur in production could cause the flow to crash instead of backpressuring.

My intuition was that the downstream red light should have propagated upstream instantly, so that no HTTP requests should have taken place. But evidently there was a buffer between the HTTP processing and the sink. This buffer initially gave the green light for HTTP requests to start, and then that light changed to red when there were half-processed responses that didn’t have any room to proceed.

I would like to get more understanding and visibility and control of those buffers. How big are they; do they go in between every stage of the explicit flow (like corridors of a certain capacity), or are they attached to the input and output of given components; is there a way of calibrating their size?

I’ve been experimenting with cachedHostConnectionPool because I’ve been hitting max-open-requests limits when using singleRequest, and I was hoping that the flow element would just backpressure if there was any chance of going over that limit. It would be very nice to have a Flow that won’t crash, so I don’t have to write code to monitor and retry. It would be very nice not to have pull calibration numbers out of the air - to be able to just let the Flow determine its own safe rate.

Back to the response entity subscription delay… There’s a critical section where an element shouldn’t be able to enter unless it can get to the end without any chance of getting stuck. So it needs a green light both from the start of the critical section and from the end of the critical section. Can this be implemented by having an alsoTo at the start of the critical section that sends a copy of the request to the end - the documentation says that that alsoTo uses backpressure information from both destinations. Downstream of the critical section will have the requests and the responses interspersed, or maybe a way of putting it is that the request is holding a space for its response. The requests have to be filtered out at some point, but I haven’t been able to find a safe way of doing it inside the flow.

Does that reasoning work? Will it guarantee that the http requests will get through the critical section without being held up?

Sample code below. Running in openjdk11, akka 2.6.15, akka-http 10.2.4. With the alsoTo line commented out, it crashes after 18 http requests get through. With the alsoTo line included, it ran for a few minutes anyway without crashing.

import java.time.Duration;
import java.time.LocalTime;
import java.util.concurrent.CompletionStage;

import akka.actor.ActorSystem;
import akka.event.Logging;
import akka.http.javadsl.Http;
import akka.http.javadsl.model.HttpEntity;
import akka.http.javadsl.model.HttpRequest;
import akka.http.javadsl.model.HttpResponse;
import akka.http.javadsl.model.Uri;
import akka.japi.Pair;
import akka.stream.Attributes;
import akka.stream.Attributes.LogLevels;
import akka.stream.DelayOverflowStrategy;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import akka.util.ByteString;

public class ResponseSubscriptionIssue {

  // https://discuss.lightbend.com/t/stream-mapasync-1-vs-single-threaded-execution-context-with-stream-mapasync-16-to-maintain-ordering/886

  // https://discuss.lightbend.com/t/a-lot-requests-results-in-response-entity-was-not-subscribed-after/7797/5

  // https://github.com/akka/alpakka/issues/2296#issuecomment-627897852

  // https://github.com/akka/akka-http/issues/2120

  // https://discuss.lightbend.com/t/difference-between-http-singlerequest-and-http-superpool/713/2

  private static class CorrelatedRequest {
    private final HttpRequest req;
    private final int n;

    CorrelatedRequest(Uri uri, int n) {
      var pathAmended = uri.path() + "/" + n + "/" + LocalTime.now();
      var uriAmended = uri.path(pathAmended).toString();
      this.req = HttpRequest.GET(uriAmended);
      this.n = n;
    }

    Pair<HttpRequest, Object> toPair() {
      return new Pair<HttpRequest, Object>(req, n);
    }

    public String toString() {
      return String.format("CorrelatedRequest(%d, %s)", n, req);
    }

  }

  private static CompletionStage<String> responseBodyString(ActorSystem system,
    HttpResponse httpResponse) {
    long strictTimeoutMillis = Duration.ofSeconds(10).toMillis();
    CompletionStage<HttpEntity.Strict> strict = httpResponse
      .entity()
      .toStrict(strictTimeoutMillis, system);
    return strict
      .thenApply(HttpEntity.Strict::getData)
      .thenApply(ByteString::utf8String);
  }

  private static Attributes logAttributes() {
    var info = Logging.InfoLevel();
    LogLevels logLevels = new LogLevels(info, info, info);
    return Attributes.none().and(logLevels);
  }

  public static void main(String[] args) {

    var logAttributes = logAttributes();

    var system = ActorSystem.create();
    var log = Logging.getLogger(system, "test");
    var uri = Uri.create("http://127.0.0.1:8080/echo.php");
    var http = Http.get(system);

    var sink = Flow
      .of(Object.class)
      // To simulate whatever downstream delay might happen to come up.
      .delay(Duration.ofSeconds(10), DelayOverflowStrategy.backpressure())
      .to(Sink.foreach(it -> {
        log.info("SINK: {}", it);
      }));

    var afterCriticalSection = Flow
      .of(Object.class)
      .log("AFTER CRITICAL SECTION", log)
      .addAttributes(logAttributes)
      .to(sink);

    var source = Source
      .range(1, Integer.MAX_VALUE)
      .map(n -> new CorrelatedRequest(uri, n))
      .map(it -> (Object) it);

    // Comment this out to replicate 'response entity was not subscribed'
    source = source.alsoTo(afterCriticalSection);

    var poolFlow = http
      .cachedHostConnectionPool(uri.getHost().toString() + ":" + uri.getPort());

    var criticalSection = Flow
      .of(Object.class)
      // .detach() // EDIT: thought this might be relevant but maybe not.
      .map(it -> (CorrelatedRequest) it)
      .map(CorrelatedRequest::toPair)
      .via(poolFlow)
      .log("RESPONSE BEFORE PROCESSING BODY", Pair::second, log)
      .map(pair -> pair.first().get())
      .mapAsync(1, res -> responseBodyString(system, res))
      .log("RESPONSE AFTER PROCESSING BODY", log)
      .addAttributes(logAttributes)
      .map(it -> (Object) it);

    source.via(criticalSection).runWith(afterCriticalSection, system);

  }

}

Hi @dcdl,

if I see it correctly, effectively, you basically added the same delay on both sides of your “critical section”. However, the whole afterCriticalSection will be materialized twice and there’s no synchronization that would ensure that both sinks are really consuming with the same speed. In your example, it is likely that the big fixed delay will be enough to throttle consumer and producer to the same rate.

In a more general case, you will not now about delays in the sink and it’s hard to dynamically replicate any slowdowns from the sink to the source.

Right now, I think there’s not much to say any more about the problem. This topic and the linked issues contain all the information. In a nutshell, there’s a fundamental issue at play here, that in akka-stream backpressure is propagated in a unit of one element (one response in this case). However, if we would strictly propagate that without introducing any buffers, we could not have any parallel requests on one pool. Therefore, we must at least introduce a buffer of size max-connections to actually be able to feed that parallel part of the pipeline regardless of whether the sink side can already process those extra responses right now or not. For that reason, the above suggested mapAsync(singleRequest) approach is the safest approach because you will always instantly be able to process any response that comes in and then mapAsync will buffer the processed response until the rest of the pipeline is able to pick up on that.

However, the whole afterCriticalSection will be materialized twice…

That breaks it alright. Instead of, as I imagined, looping back to rejoin the same stream, it’s a copy and paste of the entire downstream.