Suggestion for IfThenElse Flow?

Hello there,

May I suggest an addition to Flow operation with akka streams?

case class IfThenElseMapFlow[T, U, V](condition: T => Boolean, thn: T => U, els: T => V) extends GraphStage[FanOutShape2[T, U, V]]{

  val in: Inlet[T] = Inlet[T]("Input")
  val outThen: Outlet[U] = Outlet("Then")
  val outElse: Outlet[V] = Outlet("Else")

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
    new GraphStageLogic(shape) with InHandler {
      override def onPush(): Unit = {
        val data = grab(in)
        if (condition(data)) {
          //isAvailable(outThen) check required?
          push(outThen, thn(data))
        } else {
          //isAvailable(outElse) check required?
          push(outElse, els(data))
        }
      }

      override def toString: String = "IfThenElseFlow"
      setHandler(in, this)

      Seq(outThen, outElse).foreach { o=>
        setHandler(o, new OutHandler {
          override def onPull(): Unit = {
            if (!hasBeenPulled(in)) pull(in)
          }
        })
      }
    }
  }

  override def shape: FanOutShape2[T, U, V] = new FanOutShape2(in, outThen, outElse)
}

with a an example usage like…

def mapAsyncWithAcceptOrErrorFlow[T, R](parallelism: Int)(effect: T => Future[R])(implicit ec: ExecutionContext): Graph[FanOutShape2[T, (T, R), (T, Throwable)], NotUsed] = {
    GraphDSL.create() { implicit b =>
      import GraphDSL.Implicits._

      val flowOperation: FlowShape[T, (T, Either[R, Throwable] with Product with Serializable)] = b.add(Flow[T].mapAsync(parallelism) { t => effect(t).map(r => (t, Left(r))).recoverWith{ case error: Throwable => Future(t, Right(error))} })

      val ifThen: FanOutShape2[(T, Either[R, Throwable]), (T, R), (T, Throwable)] = b.add(new IfThenElseMapFlow[(T, Either[R, Throwable]), (T, R), (T, Throwable)]( _._2.isLeft, {case (t, Left(r)) => (t, r)}, {case (t, Right(r)) => (t, r)}))

      flowOperation ~> ifThen.in

      new FanOutShape2(flowOperation.in, ifThen.out0, ifThen.out1)
    }
  }

Perhaps there is a better way to do this with streams?

Please advice,
Muthu

You can achive this with partition, and 2 maps.

You can convert the bool function to a 0 or 1 func. and apply the given than-else functions to the associated outputs. (You can even wrap it to the same signature as your stage looks like right now.)

Partition is based on UniformFanOutShape. But its an interesting idea to use this. Thanks for the tip.

Thanks
Muthu

I’m finally at front of my computer.

def ifThenElseMapFlow[T, U, V](condition: T => Boolean, thn: T => U, els: T => V): Graph[FanOutShape2[T, U, V], NotUsed] = {

    def conditionInt(t:T) = if (condition(t)) 0 else 1
    
    GraphDSL.create() { implicit b =>
      import GraphDSL.Implicits._

      
      val partition = b.add(new Partition[T](2, conditionInt))
      val thenF = b.add(Flow[T].map(thn))
      val elseF = b.add(Flow[T].map(els))
      
      partition.out(0) ~> thenF
      partition.out(1) ~> elseF

      new FanOutShape2(partition.in, thenF.out, elseF.out)
    }
  }

I think this will do exactly the same as your case class without any selfmade stage.

Nice!.. Let me use this one instead.

Note that you have divertyTo in the flow API which is a special case of partition with two outlets, so no need to drop down to the GraphDSL:

  val evenSink = Sink.foreach((n: Int) => println(s"even: $n"))
  val oddSink = Sink.foreach((n: Int) => println(s"odd: $n"))
  Source(1 to 10).divertTo(evenSink, _ % 2 == 0).to(oddSink).run()

Wow! Nice! This topic is a good example of “how well you know the library” stages :D
(maybe I need to write a stage? -> I can do it with GraphDSL -> I will use the flow API)

1 Like

Wow 30 lines to 10 lines to 3 lines… Waiting to see if there can be more :slight_smile: .

On a side note,

  1. How do I mix the above divertTo() like API with GraphDSL? Could you shed some light with an example?
  2. Any other alternatives use divertTo(flow) instead of divertTo(sink) ?

Thanks,
Muthu

2 Likes