Akka Streams - Define timeout for mapAsync

Hi,
I am using Akka stream (in Java) with an external service which its methods takes time.
I don’t want this methods to block so I defined a flow which uses mapAsyncUnordered as follows:

Flow.of(SomeClass.class).mapAsyncUnordered(numThreads, someService::someMethod);

I want to define a timeout to the method running in the mapAsyncUnordered stage, so that if the method takes too much time, it won’t fill the mapAsyncUnordered queue and prevent from others to pass.

The only option I saw is to define a different actor and use “ask” method of it, but as described in the documentation, if we arrive to timeout, the whole stream will be terminated with failure and this is not good for me (although it can be recovered, since I can’t lose data).
https://doc.akka.io/docs/akka/current/stream/futures-interop.html

Is there another option?

Thank you!

I think in this case you need to handle the timeout “out of stream”, or you need to implement a new stage for that. I would vote for the first.

Also the build in scala futures are not really “time out”, the “best” solution that I saw is racing two futures, and whatever is the faster that is the result, but this will not kill the other, still running future, so pls don’t do that. Cats IO monad, and ZIO can time out properly, but I think they have no easy to use java api.

With actors, the problems will be the same, you timeout (which means the other party not responded with a reply message), but the computation is not stopped, so the actor (or the future inside the actor) will still waiting for the response of the api.

If you have the chance to use it, I think the akka-http-client can handle timeouts. Also you probably want to recover the failed future, and wraping the whole thing to an Either or something similar datastructure, where you can drop/log/retry the failed requests.

If you are on JDK 9 there are a few new methods: CompletableFuture#orTimeout and CompletableFuture#completeOnTimeout that could be useful.