Using mapConcat with PassThroughFlow can result to a dead lock?

Hi,

I would like to have a confirmation about the usage of PassThroughFlow.

Reading the documentation, the following quotes make me think that the usage I’m doing is wrong :

This flow combinator is guaranteed to work correctly on flows that have behavior of classic total functions, meaning that they should be a one-t-one functions that don’t reorder, drop, inject etc new elements.

I’m using in the following way :

val innerFlow =

Flow
.map { msg =>
someIO(msg) that returns a list
}
.mapConcat(identity)
.via(someLogic1)
.via(someLogic2)

Consumer
.committableSource(settings, Subscriptions.topics("my-topic"))
.via(PassThroughFlow(innerFlow))
.map { case (committableMessage, processingResult) => ??? }

The innerFlow takes one message from Kafka and may returns more than 1 output so the function is not 1:1 but 1:N. So for me it’s the wrong way to use this PassThroughFlow right or maybe there is something that I didn’t understand ?

By the way, the general use case that I’m trying to solve :

  • For each message from Kafka, one flow is doing some IO that returns a List of N elements and returns a [CommitableOffset, List[T]]

  • I would like to use mapConcat but since I need to commit at the end of the graph I cannot do it because the CommitableOffset will the the same for elements of the list so I’m going to commit the same offsets multiple times.

This is why I tried to use PassThroughFlow, to wrap the logic into a innerFlow that is doing this job without taking care of commits.