Splitting messages into separate streams and maintaining order

The background for this is that I am doing my due diligence while documenting Document how to skip messages when consuming and publishing events.

In general the problem is how to split messages into separate streams and maintain order when they are combined back together?

The specific problem here is that we have to provide a Flow[Payload, Done, NotUsed] to Subscriber.atLeastOnce. It must emit exactly one Done element for every Payload element. There are situations where we want to effectively skip certain elements by mapping them straight to Done. The ones we don’t skip will perform some side effect before emitting Done.

The simple example I was going to use was:

    helloService
      .greetingsTopic()
      .subscribe
      .atLeastOnce(
        Flow[GreetingMessage].map {
          case msg@GreetingMessage(message) if message.startsWith("Hello") => doSomethingWithTheMessage(msg)
          case _ => Done
        }
      )

This only works in cases where a function is all that is needed. If we want to use a stream from another library like Alpakka for example we need something like:

    helloService
      .greetingsTopic()
      .subscribe
      .atLeastOnce(flow)

    def flow: Flow[GreetingMessage, Done, NotUsed] = {
      Flow.fromGraph(GraphDSL.create() { implicit builder =>
        val split = builder.add(partition)
        val combine = builder.add(Merge[Done](2))
        split ~> logic  ~> combine
        split ~> ignore ~> combine
        FlowShape(split.in, combine.out)
      })
    }

    def partition = {
      Partition[GreetingMessage](2, {
        case msg@GreetingMessage(message) if message.startsWith("Hello") => 0
        case _ => 1
      })
    }

    // This flow must be an attached stage otherwise the Done messages could get out of order.
    def logic: Flow[GreetingMessage, Done, NotUsed] = {
      Flow.fromFunction(doSomethingWithTheMessage)
    }

    def ignore: Flow[GreetingMessage, Done, NotUsed] = {
      Flow.fromFunction((_: GreetingMessage) => Done)
    }

The main caveat that I can think of here is that logic must be an “attached flow” (is this known term?). Is there a better way to do this?

What advice should we give to end users when creating such a flow?

Not sure what you mean by “attached Flow”, though yes such flow must be exactly “one to one”, it must not drop elements and it must not emit more elements for a single input. There is no way around it, the logic otherwise simply does not work out.

We internally have a BidiFlow which checks for such behavior actually;

  /**
   * Creates a generic ``BidiFlow`` which verifies that another flow produces exactly one output element per
   * input element, at the right time. Specifically it
   *
   * 1. triggers an ``UnexpectedOutputException`` if the inner flow produces an output element before having
   *    consumed the respective input element.
   * 2. triggers an `OutputTruncationException` if the inner flow completes before having produced an output element
   *    for every input element.
   * 3. triggers an `OutputTruncationException` if the inner flow cancels its inputs before the upstream completes its
   *    stream of inputs.
   * 4. Backpressures the input side if the maximum number of pending output elements has been reached,
   *    which is given via the ``maxPending`` parameter. You can use -1 to disable this feature.
   */
  def apply[I, O](
    maxPending:                Int,
    outputTruncationException: Int ⇒ Throwable = OutputTruncationException,
    unexpectedOutputException: Any ⇒ Throwable = UnexpectedOutputException): BidiFlow[I, I, O, O, NotUsed] =
    BidiFlow.fromGraph(new One2OneBidi[I, O](maxPending, outputTruncationException, unexpectedOutputException))

in Akka HTTP;


Though your question is about “split and maintain order” which is somewhat at odds with eachother, and definitely not doable with a partition and merge – these don’t guarantee such things.

You’d have to implement a stage which does exactly that: routes into a logic, verifies it does not push more than once for each push it has received etc (enforcing the 1-to-1–nness) and by encapsulating “where it pushed first”, it could then “wait for that logic to give it an element, before accepting elements from other logics it has pushed”.

In other words, this would be useful, but does not exist as pre-existing stage I believe. It could be an interesting contribution to Akka Streams; In a way it is like an mapAsync(parallelism), but for arbitrary however strictly 1-to-1 Flows. Could be interesting, I agree.

By “attached flow” I mean the opposite of a “detached flow” which I have heard before, I think it was from Viktor Klang, when I was enquiring about back-pressure behaviour some time ago. The docs mention it very briefly in Simple processing stages.

I think there is more to it than just being “one to one” in terms of emitting elements, the demand flowing upstream must also be be “one to one”. For example a stage that buffers elements could cause problems since it will pull upstream multiple times and potentially cause elements to go down the other stream and break the order.

Having a stage to verify the required behaviour is an interesting idea. We would still need a way to explain in words to a user what the requirements of their logic flow are. It is this explanation, caveats, etc. that I am trying to nail down at this point.

Yeah “rate detached” is usually how we refer to them, I’ll make a ticket to fix that docs reference to be more clear.

Yes; check the impl of that I linked :slight_smile:

Yeah; not sure if it’ll work out well, but maybe it will. Curious to see a PR if you’d have time to work on it.

(I posted a similar comment on the Lagom PR that initiated this discussion)

Wouldn’t it be simpler to keep it as just requiring a function?

You only need a function String => Future[Done], what happens inside that function can be of arbitrary complexity without compromising the ordering of the outer stream. Or am I missing something?

def doSomethingWithTheMessage(msg: GreetingMessage): Future[Done] = {
  // do here whatever is needed
  Source
    .single(msg)
    .map { i =>
      println(s"processing: '$i'")
      Done
    }
    .runWith(Sink.last)
}


helloService
  .greetingsTopic()
  .subscribe
  .atLeastOnce(
    Flow[String].mapAsync(1) {
      case msg@GreetingMessage(message) if message.startsWith("Hello") => doSomethingWithTheMessage(msg)
      case _ => Future.successful(Done)
    }
  )