I’m trying to build out a fork/join flow; I’ve found a few reference implementations and relevant questions on stack overflow:
I’ve basically landed at this implementation:
object ForkJoinFlow {
private val decider = Supervision.restartingDecider
def alsoFlow[T, U](flow: Flow[T, U, NotUsed],
errorSink: Graph[SinkShape[(Try[U], Try[T])], _] = Sink.ignore
): Flow[T, (U, T), NotUsed] =
forkJoinFlow(flow, Flow[T], errorSink)
def forkJoinFlow[T, U, V](flow1: Flow[T, U, NotUsed],
flow2: Flow[T, V, NotUsed],
errorSink: Graph[SinkShape[(Try[U], Try[V])], _] = Sink.ignore
): Flow[T, (U, V), NotUsed] =
Flow.fromGraph(GraphDSL.create() { implicit b: GraphDSL.Builder[NotUsed] =>
import GraphDSL.Implicits._
// val unzip = b.add(UnzipWith[T, T, T]({t => (t, t)}))
val bcast = b.add(Broadcast[T](2))
val zip = b.add(Zip[Try[U], Try[V]]())
bcast ~> flow1
.map(Success.apply)
.recover { case NonFatal(e) => Failure(e) } ~> zip.in0
bcast ~> flow2
.map(Success.apply)
.recover { case NonFatal(e) => Failure(e) } ~> zip.in1
FlowShape(bcast.in, zip.out)
})
.addAttributes(AttributesSupervisionStrategy(decider)))
.divertTo(errorSink, { case (u, v) => u.isFailure || v.isFailure})
.collect { case (Success(u), Success(v)) => (u, v) }
implicit class FlowExt[In, Out, Mat](flow: Flow[In, Out, Mat]) {
def also[U](flow1: Flow[Out, U, NotUsed],
errorSink: Graph[SinkShape[(Try[U], Try[Out])], _] = Sink.ignore
): Flow[In, (U, Out), Mat] = {
flow.via(alsoFlow(flow1, errorSink))
}
def forkJoin[U, V](
flow1: Flow[Out, U, NotUsed],
flow2: Flow[Out, V, NotUsed],
errorSink: Graph[SinkShape[(Try[U], Try[V])], _] = Sink.ignore
): Flow[In, (U, V), Mat] = {
flow.via(forkJoinFlow(flow1, flow2, errorSink))
}
}
}
The following are my tests that describe desired functionality:
class ForkJoinFlowSpec extends AsyncFreeSpec with Matchers {
import ForkJoinFlow._
implicit val actorSystem = ActorSystem()
implicit val ec = actorSystem.dispatcher
val flow = Flow[String]
.forkJoin[Int, String](
Flow[String].mapAsync(1) { str=> Future.fromTry(Try { str.toInt }) },
Flow[String].mapAsync(1) { str => Future.fromTry(Try { str.substring(0, 2)}) },
Sink.foreach { it: (Try[Int], Try[String]) => println(s"error: $it") })
"The fork Join flow" - {
"should join many inputs together" in {
Source.fromIterator(() => Seq("11", "22", "33").iterator)
.via(flow)
.runWith(Sink.fold(Seq.empty[(Int, String)]) { case (agg, next) => agg :+ next })
.map { result =>
result shouldBe Seq((11, "11"), (22, "22"), (33, "33"))
}
}
"should ignore members in the second flow that correspond to failures in the first" in {
Source.fromIterator(() => Seq("11", "bb", "33").iterator)
.via(flow)
.runWith(Sink.fold(Seq.empty[(Int, String)]) { case (agg, next) => agg :+ next })
.map { result =>
result shouldBe Seq((11, "11"), (33, "33"))
}
}
"should ignore members in the first flow that correspond to failures int he second" in {
Source.fromIterator(() => Seq("11", "2", "33").iterator)
.via(flow)
.runWith(Sink.fold(Seq.empty[(Int, String)]) { case (agg, next) => agg :+ next })
.map { result =>
result shouldBe Seq((11, "11"), (33, "33"))
}
}
}
}
I get the same behavior with either broadcast or unzip. If I exclude the supervisor strategy the list is still truncated, the result is Seq((11, “11”)) for both of the latter tests.
Then, if include the supervisor strategy the lists don’t match, the 2nd element from the successful flow become paired with the 3rd element from the flow with an error. i.e. List((11,11), (2,33))
and List((11,11), (33,bb))
It seems to me that the FanOut shape isn’t recovering from an Exception as I am expecting it to. Is this a bug, or am I doing something wrong? (or perhaps there is a better strategy for me to get this kind of behavior?)