Suggestion for IfThenElse Flow?


(Muthu Jayakumar) #1

Hello there,

May I suggest an addition to Flow operation with akka streams?

case class IfThenElseMapFlow[T, U, V](condition: T => Boolean, thn: T => U, els: T => V) extends GraphStage[FanOutShape2[T, U, V]]{

  val in: Inlet[T] = Inlet[T]("Input")
  val outThen: Outlet[U] = Outlet("Then")
  val outElse: Outlet[V] = Outlet("Else")

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
    new GraphStageLogic(shape) with InHandler {
      override def onPush(): Unit = {
        val data = grab(in)
        if (condition(data)) {
          //isAvailable(outThen) check required?
          push(outThen, thn(data))
        } else {
          //isAvailable(outElse) check required?
          push(outElse, els(data))

      override def toString: String = "IfThenElseFlow"
      setHandler(in, this)

      Seq(outThen, outElse).foreach { o=>
        setHandler(o, new OutHandler {
          override def onPull(): Unit = {
            if (!hasBeenPulled(in)) pull(in)

  override def shape: FanOutShape2[T, U, V] = new FanOutShape2(in, outThen, outElse)

with a an example usage like…

def mapAsyncWithAcceptOrErrorFlow[T, R](parallelism: Int)(effect: T => Future[R])(implicit ec: ExecutionContext): Graph[FanOutShape2[T, (T, R), (T, Throwable)], NotUsed] = {
    GraphDSL.create() { implicit b =>
      import GraphDSL.Implicits._

      val flowOperation: FlowShape[T, (T, Either[R, Throwable] with Product with Serializable)] = b.add(Flow[T].mapAsync(parallelism) { t => effect(t).map(r => (t, Left(r))).recoverWith{ case error: Throwable => Future(t, Right(error))} })

      val ifThen: FanOutShape2[(T, Either[R, Throwable]), (T, R), (T, Throwable)] = b.add(new IfThenElseMapFlow[(T, Either[R, Throwable]), (T, R), (T, Throwable)]( _._2.isLeft, {case (t, Left(r)) => (t, r)}, {case (t, Right(r)) => (t, r)}))

      flowOperation ~>

      new FanOutShape2(, ifThen.out0, ifThen.out1)

Perhaps there is a better way to do this with streams?

Please advice,

(Gergő Törcsvári) #2

You can achive this with partition, and 2 maps.

You can convert the bool function to a 0 or 1 func. and apply the given than-else functions to the associated outputs. (You can even wrap it to the same signature as your stage looks like right now.)

(Muthu Jayakumar) #3

Partition is based on UniformFanOutShape. But its an interesting idea to use this. Thanks for the tip.


(Gergő Törcsvári) #4

I’m finally at front of my computer.

def ifThenElseMapFlow[T, U, V](condition: T => Boolean, thn: T => U, els: T => V): Graph[FanOutShape2[T, U, V], NotUsed] = {

    def conditionInt(t:T) = if (condition(t)) 0 else 1
    GraphDSL.create() { implicit b =>
      import GraphDSL.Implicits._

      val partition = b.add(new Partition[T](2, conditionInt))
      val thenF = b.add(Flow[T].map(thn))
      val elseF = b.add(Flow[T].map(els))
      partition.out(0) ~> thenF
      partition.out(1) ~> elseF

      new FanOutShape2(, thenF.out, elseF.out)

I think this will do exactly the same as your case class without any selfmade stage.

(Muthu Jayakumar) #5

Nice!.. Let me use this one instead.

(Johan Andrén) #6

Note that you have divertyTo in the flow API which is a special case of partition with two outlets, so no need to drop down to the GraphDSL:

  val evenSink = Sink.foreach((n: Int) => println(s"even: $n"))
  val oddSink = Sink.foreach((n: Int) => println(s"odd: $n"))
  Source(1 to 10).divertTo(evenSink, _ % 2 == 0).to(oddSink).run()

(Gergő Törcsvári) #7

Wow! Nice! This topic is a good example of “how well you know the library” stages :D
(maybe I need to write a stage? -> I can do it with GraphDSL -> I will use the flow API)

(Muthu Jayakumar) #8

Wow 30 lines to 10 lines to 3 lines… Waiting to see if there can be more :slight_smile: .

On a side note,

  1. How do I mix the above divertTo() like API with GraphDSL? Could you shed some light with an example?
  2. Any other alternatives use divertTo(flow) instead of divertTo(sink) ?