GraphStage with Shape of 2-in and 2-out

(Ignatius) #1

Hi guys,

I need to write a custom GraphStage that has two input ports, and two output ports. This GraphStage will allow two otherwise independent flows to affect each other. What shape could I use for that? FanOutShape2 Has two outputs and FanInShape2 has two inputs, but how can I have a shape that has both? Somehow combine (inherit from) both? Use BidiFlow? Make my own?

(Martynas Mickevičius) #2

You can use BidiFlow if that shape makes sense for your use-case. If it does, then you’ll get some free APIs that are defined on BidiFlow or on Flow that takes BidiFlow as an argument.

You can also create your own shape by extending Shape. There you can have as many Inlets and Outlets as you need. Look at the implementations of built-in shapes for inspiration on how to implement abstract methods of Shape.

However users will only be able to use your custom shape in GraphDSL, because APIs on Source/Flow/Sink do not know how to handle arbitrary shapes.

(Ignatius) #3

Thanks a lot for your reply, very helpful.

You can use BidiFlow if that shape makes sense for your use-case.

Is it a requirment for BidiFlow to actually go bi-directional, or can you use it just as much for two flows directed at the same sink?

(Johan Andrén) #4

With the BidiFlow.fromGraph you can construct a BidiFlow based on you own BidiShaped GraphStage which can have arbitrary element routing logic between the for in-and-outlets.

(Ignatius) #5

Thanks! To conclude this, I have now implemented my requirement, and the answer to this question is to simply use BidiShape. Despite the otherwise revealing name, the logic behind a BidiShape has to be by no means bi-directional (it’s obvious in retrospect, but I was thrown off by this).

Some code that can be used for reference if anybody is in a similar situation, where they have to do something based on two inputs, with the possibility to push to two outputs:

class BiNoneCounter[T]() extends GraphStage[BidiShape[Option[T], Option[Int], Option[T], Option[Int]]] {
  private val leftIn = Inlet[Option[T]]("BiNoneCounter.in1")
  private val rightIn = Inlet[Option[T]]("BiNoneCounter.in2")
  private val leftOut = Outlet[Option[Int]]("BiNoneCounter.out1")
  private val rightOut = Outlet[Option[Int]]("BiNoneCounter.out2")
  override val shape = BidiShape(leftIn, leftOut, rightIn, rightOut)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
    private var grabNextPush = false

    val inHandler = new InHandler {
      override def onPush(): Unit = {
        if (grabNextPush) {
          (grab(leftIn), grab(rightIn)) match {
            // do stuff here
          }
        }
        grabNextPush = !grabNextPush
      }
    }

    val outHandler = (inlet: Inlet[Option[T]]) => new OutHandler {
      override def onPull(): Unit = {
        pull(inlet)
      }
    }

    setHandler(leftOut, outHandler(leftIn))
    setHandler(rightOut, outHandler(rightIn))
    setHandler(leftIn, inHandler)
    setHandler(rightIn, inHandler)
  }
}

Can be used like this:

        sourceOne ~> bidi.in1
                     bidi.out1 ~> sinkOne
        sourceTwo ~> bidi.in2
                     bidi.out2 ~> sinkTwo