Simplest possible parallelization of different flows

I have two different flows I want to parallelize. I don’t want the “parent” of these two flows to emit upstream until both of these “child” flows have completed, no matter how much longer the slower of the two flows takes. I know I want to use the GraphDSL but, because of this requirement that both complete, I don’t think I want to use the Balance operator since it would allow one of the two flows to fall behind.

The only way I could see to do this is through the Partition operator, but only by hacking the operator. As you see below, with the function being passed to the second argument of Partition, I’m always returning 0 for the partition. And then I’m ignoring the partition and using my own counter to decide which flow to kick off:

private def createParallelFlowsToProduceMessagesToPublishTopics()(
  implicit ec: ExecutionContext)
: Flow[PostProcessorContainer, PostProcessorContainer, NotUsed] = {
  val graph = GraphDSL.create() { implicit builder =>
    import akka.stream.scaladsl.GraphDSL.Implicits._

    val numberOfBranches = 2
    var counter = 0

    val partition = builder.add(Partition[PostProcessorContainer](numberOfBranches, container => 0))
    val merge = builder.add(Merge[PostProcessorContainer](numberOfBranches))

    for (_ <- 1 to numberOfBranches) {
      if (counter % numberOfBranches == 0) {
        partition ~> createFlowToProduceMessageToPublishTopic().async ~> merge
      } else {
        partition ~> createFlowToProduceMessageToUnifiedPublishTopic().async ~> merge
      }
      counter = counter + 1
    }

    FlowShape(partition.in, merge.out)
  }
  Flow.fromGraph(graph)
}

Surely, there must be a simpler, less hacky way to accomplish this with Partition. Or it may be that my misgivings about Balance are unfounded. Or it may be that I can take advantage of one of the Predefined shapes.

What’s the canonical way to express what I’m trying to accomplish?

I think there may be some terminology mixup here, elements are emitted downstream. When a source completes that means no more elements will be emitted from it and downstream can also complete if it does not have any buffered elements. The completion will therefore travel downstream.

Partition will emit the element down only one, that your function selects, of the branches and Merge will emit downstream as soon as elements come from any of the upstream.

The fan out operator Broadcast emits the value down all its downstream. The fan in operator Zip waits until it has seen one element from each of multiple upstreams and then emits a tuple of all those downstream and then repeats. Is this perhaps what you are after?

Belated thanks for Johan for offering what proved to be a functionally correct solution, and for correcting me on my terminology too. For some reason, I latched on to the word “emit” in the wrong way.

I want to follow up to spell out what didn’t work and to call out the limitations of what I ostensibly have working.

First, my original attempt above was based on the “Partition” example in Colin Breck’s blog post, Partitioning Akka Streams to Maximize Throughput. It seems to me now that, the only way this example even works is by sort of hacking the Partition mechanism to make it partition one way for one stream element, another way for the next stream element, and so on.

Next I tried combining Broadcast and Merge:

private def createParallelFlowsToConditionallyProduceMessagesToPublishTopics()(
  implicit ec: ExecutionContext)
: Flow[PostProcessorContainer, PostProcessorContainer, NotUsed] = {
  val graph = GraphDSL.create() { implicit builder =>
    import akka.stream.scaladsl.GraphDSL.Implicits._

    val numberOfBranches = 2

    val broadcast = builder.add(Broadcast[PostProcessorContainer](numberOfBranches))
    val merge = builder.add(Merge[PostProcessorContainer](numberOfBranches))

    broadcast.out(0) ~> createFlowToConditionallyProduceMessageToUnifiedPublishTopic().async ~> merge
    broadcast.out(1) ~> createFlowToConditionallyProduceMessageToPublishTopic().async ~> merge

    FlowShape(broadcast.in, merge.out)
  }
  Flow.fromGraph(graph)
}

My hope was that the merge would take the two broadcasts and merge them back into a single flow. That proved not to be the case; instead, it caused everything to get doubled up downstream. If I’d done five broadcasts and then tried to merge them, I’d only be multiplying my stream five-fold downstream.

Finally, here’s my naïve implementation where I was able to take advantage of Broadcast and Zip:

private def createParallelFlowsToConditionallyProduceMessages()(
  implicit ec: ExecutionContext)
: Flow[PostProcessorContainer, PostProcessorContainer, NotUsed] = {
  val graph = GraphDSL.create() { implicit builder =>
    import akka.stream.scaladsl.GraphDSL.Implicits._

    val numberOfBranches = 2

    val broadcast = builder.add(Broadcast[PostProcessorContainer](numberOfBranches))
    val zip = builder.add(Zip[PostProcessorContainer, PostProcessorContainer]())

    broadcast.out(0) ~> createFlowToConditionallyProduceMessageToUnifiedTopic() ~> zip.in0
    broadcast.out(1) ~> createFlowToConditionallyProduceMessageToPublishTopic() ~> zip.in1

    FlowShape(broadcast.in, zip.out)
  }
  Flow.fromGraph(graph)
    .map(_._1)
}

There’s nothing wrong with this implementation. It does successfully merge the two branches back into one. What makes it naïve is that it’s not really parallelizing anything. The two flows being called are just taking a ProducerMessage.Message and passing it to Producer.flexiFlow.

I suppose I could wrap a mapAsync around each of these flows. That would be an improvement, although the benefit will be limited if Producer.flexiFlow is sending the message in an inherently synchronous way. I suppose I could dig into the Alpakka Kafka Consumer source and see if it’s using an asynchronous Producer API from the kafka-clients library anywhere, although I see no indication from the API docs that it would be.

If I have to boil down this follow-up post to one simple question, it would be: Is there any way to call the Alpakka Kafka Consumer APIs to tap into any asynchronous message producing provided by the kafka-clients library?

Context. I’m coming from the experience of having used the DataStax Java Driver, the Cassandra client, where its Session.executeAsync method returns a future type, thereby giving me the asynchronous primitive by which I can compose all sorts of parallelization hierarchies.

Did you read the docs section on pipelining and parallelism, maybe that could provide some further insights: https://doc.akka.io/docs/akka/current/stream/stream-parallelism.html#pipelining-and-parallelism

Note that nothing here is synchronous, the stream will be just as async as if you had constructed it using Futures, it just compares to a set of futures depending on each other in a way that does not cause anything to happen in parallel.

Getting parallelization in-stream in the face of kafka offset based commits which requires that elements/offsets are not re-ordered can be somewhat tricky.

One way would be to add a same sized buffer at the end of each of your broadcast branches before zipping, that would make it work in a similar way as having a mapAsync(n) in there.

A simpler way to reason about may be to instead run more parallel stream with the same consumer group, and therefore sharding partitions over more processing streams (a bonus for that is that it also trivially scales out over several machines if you would need that).