Consuming elements from a Source.queue and connecting to cachedHostConnectionPool

I am trying to process HTTP requests in a way that will not result in

Exceeded configured max-open-requests value of [32]

My idea for this is to use a cachedHostConnectionPool that will open a limited amount of connections, and while the connections are in use, the other queries will be buffered. I will pass requests to the flow via a Source.queue

My code currently looks like this -

    private Source<Pair<HttpRequest, Object>, SourceQueueWithComplete<Pair<HttpRequest, Object>>> sourceQueue = Source.queue(4, OverflowStrategy.backpressure(), 4);

private final Flow<Pair<HttpRequest, Object>, Pair<Try<HttpResponse>, Object>, HostConnectionPool> flow 
 = Http.get(system).cachedHostConnectionPool(host);

private final Materializer materializer = SystemMaterializer.get(system).materializer();

 private SourceQueueWithComplete<Pair<HttpRequest, Object>> queue = sourceQueue.via(flow).toMat(Sink.foreach(p -> {
            // processing ? 

        }), Keep.left()).run(materializer);

My question is how should I process the requests and get a response out of it? Particularly, what should I be doing here -

(Sink.foreach(p -> {
            // processing ? 

        })

If I have understood correctly, the Sink.foreach construct is responsible for consuming the elements that are queued, so the processing, i.e getting the HttpResponse should be completed here? If someone can provide an example based off this - that would be very helpful.
Here, p.first() gives a Try<HttpResponse>, So is this the result of the HttpRequest that is offered to the queue? I’m unsure of this as well.

Maybe this Scala example answers some of your questions, this is the interesting part:

Although it is a HttpRequest to download a file here, it shows the handling of the HttpResponse in the SourceQueue / cachedHostConnectionPool scenario.
Note that this example has a homegrown retry meccano, which may be improved.

Hope this helps
Paul

1 Like

Thanks for your reply. This is what I ended up following.