Recover From Error in Broadcast or Unzip

I’m trying to build out a fork/join flow; I’ve found a few reference implementations and relevant questions on stack overflow:


I’ve basically landed at this implementation:

object ForkJoinFlow {
  private val decider = Supervision.restartingDecider

  def alsoFlow[T, U](flow: Flow[T, U, NotUsed],
                         errorSink: Graph[SinkShape[(Try[U], Try[T])], _] = Sink.ignore
                    ): Flow[T, (U, T), NotUsed] =
    forkJoinFlow(flow, Flow[T], errorSink)

  def forkJoinFlow[T, U, V](flow1: Flow[T, U, NotUsed],
                            flow2: Flow[T, V, NotUsed],
                            errorSink: Graph[SinkShape[(Try[U], Try[V])], _] = Sink.ignore
                           ): Flow[T, (U, V), NotUsed] =
    Flow.fromGraph(GraphDSL.create() { implicit b: GraphDSL.Builder[NotUsed] =>
      import GraphDSL.Implicits._

//      val unzip = b.add(UnzipWith[T, T, T]({t => (t, t)}))
      val bcast = b.add(Broadcast[T](2))
      val zip = b.add(Zip[Try[U], Try[V]]())

      bcast ~> flow1
        .map(Success.apply)
        .recover { case NonFatal(e) => Failure(e) } ~> zip.in0
      bcast ~> flow2
        .map(Success.apply)
        .recover { case NonFatal(e) => Failure(e) } ~> zip.in1

      FlowShape(bcast.in, zip.out)
    })
      .addAttributes(AttributesSupervisionStrategy(decider)))
      .divertTo(errorSink, { case (u, v) => u.isFailure || v.isFailure})
      .collect { case (Success(u), Success(v)) => (u, v) }

  implicit class FlowExt[In, Out, Mat](flow: Flow[In, Out, Mat]) {
    def also[U](flow1: Flow[Out, U, NotUsed],
                errorSink: Graph[SinkShape[(Try[U], Try[Out])], _] = Sink.ignore
               ): Flow[In, (U, Out), Mat] = {
      flow.via(alsoFlow(flow1, errorSink))
    }

    def forkJoin[U, V](
                        flow1: Flow[Out, U, NotUsed],
                        flow2: Flow[Out, V, NotUsed],
                        errorSink: Graph[SinkShape[(Try[U], Try[V])], _] = Sink.ignore
                      ): Flow[In, (U, V), Mat] = {
      flow.via(forkJoinFlow(flow1, flow2, errorSink))
    }
  }
}

The following are my tests that describe desired functionality:

class ForkJoinFlowSpec extends AsyncFreeSpec with Matchers {
  import ForkJoinFlow._

  implicit val actorSystem = ActorSystem()
  implicit val ec = actorSystem.dispatcher

  val flow = Flow[String]
    .forkJoin[Int, String](
      Flow[String].mapAsync(1) { str=> Future.fromTry(Try { str.toInt  }) },
      Flow[String].mapAsync(1) { str => Future.fromTry(Try { str.substring(0, 2)}) },
      Sink.foreach { it: (Try[Int], Try[String]) => println(s"error: $it") })

  "The fork Join flow" - {
    "should join many inputs together" in {
      Source.fromIterator(() => Seq("11", "22", "33").iterator)
        .via(flow)
        .runWith(Sink.fold(Seq.empty[(Int, String)]) { case (agg, next) => agg :+ next })
        .map { result =>
          result shouldBe Seq((11, "11"), (22, "22"), (33, "33"))
        }

    }
    "should ignore members in the second flow that correspond to failures in the first" in {
      Source.fromIterator(() => Seq("11", "bb", "33").iterator)
        .via(flow)
        .runWith(Sink.fold(Seq.empty[(Int, String)]) { case (agg, next) => agg :+ next })
        .map { result =>
          result shouldBe Seq((11, "11"), (33, "33"))
        }
    }
    "should ignore members in the first flow that correspond to failures int he second" in {
      Source.fromIterator(() => Seq("11", "2", "33").iterator)
        .via(flow)
        .runWith(Sink.fold(Seq.empty[(Int, String)]) { case (agg, next) => agg :+ next })
        .map { result =>
          result shouldBe Seq((11, "11"), (33, "33"))
        }
    }
  }
}

I get the same behavior with either broadcast or unzip. If I exclude the supervisor strategy the list is still truncated, the result is Seq((11, “11”)) for both of the latter tests.

Then, if include the supervisor strategy the lists don’t match, the 2nd element from the successful flow become paired with the 3rd element from the flow with an error. i.e. List((11,11), (2,33)) and List((11,11), (33,bb))

It seems to me that the FanOut shape isn’t recovering from an Exception as I am expecting it to. Is this a bug, or am I doing something wrong? (or perhaps there is a better strategy for me to get this kind of behavior?)

I think the answer is to handle errors inside of a flatMapConcat

val flow = Flow[String]
      .forkJoin(
          Flow[String]
            .flatMapConcat { item =>
              Source.single(item)
                .map { str => str.toInt }
                .map(Success.apply)
                .recover { case NonFatal(e) => Failure(e) }
            },
          Flow[String].flatMapConcat { item =>
              Source.single(item)
                .map { _.substring(0, 2) }
                .map(Success.apply)
                .recover { case NonFatal(e) => Failure(e) }
          })
    .collect { case (Success(int), Success(pre)) => (int, pre) }

and simplify the forkJoin flow itself:

  def forkJoinFlow[T, U, V](flow1: Flow[T, U, NotUsed],
                            flow2: Flow[T, V, NotUsed]): Flow[T, (U, V), NotUsed] =
    Flow.fromGraph(GraphDSL.create() { implicit b: GraphDSL.Builder[NotUsed] =>
      import GraphDSL.Implicits._
      val bcast = b.add(Broadcast[T](2))
      val zip = b.add(Zip[U, V]())


      bcast ~> flow1 ~> zip.in0
      bcast ~> flow2 ~> zip.in1

      FlowShape(bcast.in, zip.out)
    })

However, this means that now each result is being split into its own stream. Since each item is its own stream, the inner flows cannot aggregate results.

Optionally, you can group around the forkJoin. But I still think that the flows downstream from broadcast should be able to recover from failed futures and thrown exceptions.