Convert a Flow[In, Out, Mat] into a Flow[In, Try[Out], NotUsed]


I want to convert a Flow[In, Out, Mat] into a Flow[In, Try[Out]], NotUsed]. The following works for me:

  def `try`[In, Out, Mat](flow: Flow[In, Out, Mat])(implicit ec: ExecutionContext, materializer: Materializer): Flow[In, Try[Out], NotUsed] = {
    def fut(in: In): Future[Try[Out]] = {
        .recover[Try[Out]] {
          case t: Throwable => Failure(t)

but it involves a mapAsync which I would like to avoid.
I tried writing a graph stage based on Recover, but that turned out to be a dead end because by the time it is executed
the in port is already closed. Using a SupervisionStrategy did not help either. Is there a better solution?

Thanks for any suggestion!

If what you are after is a take an arbitrary Flow[A, B] and make any element that fails in any operator in it and turn it into an in stream failure, there is no such tool, and to make one would have to be about building it into the Akka stream engine. Since stages can change the type, batch, re-order elements and fan out and in into more complex graphs I think this would be somewhere between very hard and impossible to achieve.

The best way is probably to make sure to compose in other ways such places where logic could fail, for example wrap .map operations with Try directly, and use Future.transform to lift the internal Try of futures into a Future[Try] before using them with .mapAsync.

The only alternative I can think of would be to take each element and materialize into a new instance of that Flow that only handles one element to know if that succeeded or failed. That will be very inefficient since it needs to start up a stream engine for each element.

Thank you @johanandren. Unfortunately I do not have access to a mapping function f: A => B, all I have is the Flow[A, B, _] itself. My context is the composition of pre-built flows into integration pipelines using control elements that model if, pattern matches, loops, try etc. and of course also plain via.

Unless I misunderstand, it is your suggested alternative that I have implemented. I should have been more concerned about the extra materialization rather than the subsequent mapAsyc in my post.
The flows I want to convert typically involve communication with external systems and/or processing requests and responses using Alpakka components, so I am hoping that the additional materialization costs are negligible in comparison.

Correctly understood :)

And you thought on other overhead being greater also makes sense, but depends on the use case (how many elements, how many different of these will be running etc.) so make sure to benchmark and verify that it lives up to whatever throughput you need.

Can’t you defensive program this out? If you build the flow from the prebuilt things, can’t you catch the “bad” elements at the beginning of the flow? Also are you sure, that you can’t make the pre-built flows safe? (I mean fix the lib or other code to generate/wrap/handle safe values only.)

For me, every time I tried to do sth like this, the problem rooted in the context, and I needed to fix the context and not the symptom.

Btw the .runWith(Sink.last) is probably not what you want… You want Sink.seq and mapConcat the elements back (if your flow really can absorb and generate elements).