Optional Flow for messages

Hi,

I’ve run into the following use case several times and would like to know what is the idiomatic way of solving it.

I would like to read a message from Kafka, do some processing which results in an an optional result which needs to be written to a database. After it is successfully written to the database (or the message processing did not deliver a result) the offset needs to be committed to Kafka.

The problem I run into is that all Alpakka connectors provide flows to write records in a database and not optional records so when the processing does not deliver a result the flow to write to the database should be skipped while not changing the order of the messages.

Up until now I solved this by not using an Alpakka connector but instead use an asynchronous database driver to optionally write records to a database which results in a Future, but I would like to use an Alpakka connector for this. Any idea on how to do this? I’ve attempted to use Partition and MergeSorted but was not successful (yet ;)). Is there a better way of doing this?

I hope my question is clear; if it’s not, be sure to ask for additional details ;) Thanks in advance!

Kind regards,
Jan-Pieter

This is somewhat tricky because you need to maintain the ordering to not break committing to Kafka but still pass elements down two different paths in the graph.

It could be achieved using a graph with partition and zip @ennru showed me a suggested solution the other day, or it could potentially be done using .flatMapConcat. Both solutions will be slow as they require each element to complete its write before the next write (or non-write) can pass the stream.

We are discussing if there is some missing API we could provide here but it’s not obvious there is a good general solution.

Since Kafka is offset based, only committing the non-empty elements would still protect against writing the same element twice to the database, at the cost of potentially having to re-process empty tailing elements on a restart of the stream. Good part if this is possible though is that you could still do batching etc and get good performance.

1 Like