Http entity Source expires when processed by an akka stream under load

Hi all

My application has the following logic, as part of its main akka stream flow:

  1. Make an http request to receive a Source of the entity data in the response
  2. Process the entity bytes (in this particular case, I’m using alpakka to upload the content to S3, but this may vary later)

Under load, it sometimes takes longer than 1 second for the Source to reach the next step and be subscribed to by the next-step Sink, causing this error:

[0 (WaitingForResponseEntitySubscription)] Response entity was not subscribed after 1 second. Make sure to... [snipped]

It is possible to perform both of these actions in the same flow step, but there are drawbacks to that approach. Is there any other idiomatic, akka-intended way to prevent this situation from occurring? It’s not a good idea to just tweak the parallelism of both flow steps or the subscription timeout, that’s just sweeping the problem under the rug until the worst possible moment…

Any advice is appreciated

Thanks
Micky

Hi @idpwf,

thanks for the report. Which client API do you use?

The subscription timeout is by default set to an aggressively short interval to make sure people are informed that not making progress on a response stream will block the pool slot from being used for other requests. There are some ways to deal with that situation:

  • Increase the subscription timeout to a value suiting your queuing / buffering situation. This is not really sweeping under the rug but tuning to your conditions. By increasing the timeout, you acknowledge that blocking a pool slot for the additional queuing time is acceptable in your application.
  • Try to reduce queuing time by whatever means suitable
  • After receiving a response run toStrict immediately to make sure to load the response entity into memory immediately and to free the pool slot afterwards. This works if response entities are small enough to fit into memory. Afterwards the response is detached from the underlying connection and can be held in your own buffers as long as you like (and as long as there’s enough memory).

Would one of those work?

Btw. a similar report has been made here before in the context of an alpakka implementation.

Johannes

Hi Johannes

I’m using the request-level API. And the current implementation calls toStrict; this refactoring’s goal is eliminating that, for memory pressure reasons :)

Reducing the queueing time / setting an arbitrary longer timeout is not an option, since backpressure from later (longish-running) stages may cause these to stall in the flow for some time.

I’ll probably end up delaying the creation of the Source until the flow is ready to handle it.

Thanks anyway
Micky