Forking a flow based on a boolean condition


(Mitch Gitman) #1

I’m not asking a question here. All I’m doing is sharing a solution that I was all ready to ask a question about as I was implementing it. I’m sharing this in case someone else comes along with much the same question.

I have an Akka Streams (Alpakka Kafka connector) workflow that is consuming messages off one Kafka topic, persisting to a database, producing messages to another Kafka topic, and then finally committing the offsets back to the original Kafka topic. I needed to introduce some logic to produce to the downstream Kafka topic only if the message being processed is valid according to our business logic. Whether the message is valid or invalid, the Kafka message’s offset still needs to be committed back upon successful processing.

I wouldn’t have been able to piece together this solution if it weren’t for the solutions I found while searching on this problem. I think what I found missing in the existing examples was the minimal case of injecting a Flow without any Source or Sink involved. Or they otherwise hadn’t quite distilled the problem (my problem) down to its essence.

My solution starts with two alternate flows that I want to pursue depending on whether the message is valid. They’re created by the following two methods:

  • createFlowToProduceValidMessagesToPublishTopic

  • createFlowToSkipInvalidMessages

Both are returning a Flow[ValidatedCommittableMessage, ConsumerMessage.CommittableOffset, NotUsed] where ValidatedCommittableMessage is a domain type that has a valid flag.

Based on this, I’m able to use the Partition and Merge constructs of the GraphDSL to fan out and then fan back in:

private def createPartitioningFlowBasedOnWhetherMessageIsValid()(
    implicit ec: ExecutionContext): Flow[ValidatedCommittableMessage, ConsumerMessage.CommittableOffset, NotUsed] = {
  val graph = GraphDSL.create() { implicit builder =>
    import GraphDSL.Implicits._

    val partition = builder.add(Partition[ValidatedCommittableMessage](2, message => if (message.valid) 1 else 0))
    val merge     = builder.add(Merge[ConsumerMessage.CommittableOffset](2))

    partition.out(0) ~> createFlowToSkipInvalidMessages() ~> merge
    partition.out(1) ~> createFlowToProduceValidMessagesToPublishTopic() ~> merge

    FlowShape(partition.in, merge.out)
  }
  Flow.fromGraph(graph)
}

And then I can just insert that into the overall flow with a via operator so that the conditional forking is encapsulated. Asynchronous semantics are handled elsewhere:

.via(createPartitioningFlowBasedOnWhetherBanIsValid())

This works for me. But if anyone has a cleaner way of pulling this off…


(Mitch Gitman) #2

Wouldn’t you know, if the question is only whether to publish to a downstream Kafka topic, there is a more elegant, more “This is what you’d do with the Alpakka Kafka connector” solution. It’s the use of ProducerMessage.PassThroughMessage, as documented under Conditional Message Processing in the At-Least-Once Delivery documentation for the Kafka connector.

I’ll implement that instead for my use case. I’ll keep the Partition+Merge technique in mind for a more generic use case.