Integrate a flow of a different type easier

Hey guys,

I have the following problem quite often:
I have a type A and a function that transform it to type B (but a transformation back is not possible).
Then I want to run a Flow<B,B,NotUsed> (e.g. writing something to a database) but then I want to proceed with type A. I need the guarantee that the further step (for that element in the stream) will be executed after the Flow<B,B,NotUsed> is already executed. Therefore source.alsoTo() is not an option.

I’m aware, that I could use a Pair, but then the Flow<B,B,NotUsed> must be something like Flow<Pair<A,B>,Pair<A,B>,NotUsed>. That seems for me too complicated.
Also, a mapAsync could solve the problem:

source()
 .mapAsync(parallelism, a -> Source.single(toB(a)).via(flow()).runWith(Sink.head(),...).thenApply(__ -> a))

That’s still not a good solution because I start a stream just for one element.

I would like to have something like:

source().via(a -> toB(a), flow())

What do you think about that?

.flatMapConcat with Source.single(a), and in the end map b back to a, the latter being still in scope of flatMapConcat

Something like

source().flatMapConcat(a -> Source.single(a).via(toB()).via(action()).map(b -> a))

(I’m not familiar with the Java DSL)

Nice!

Thank you!

SourceWithContext might also be useful here.

In the Java API, that would look something like:

SourceWithContext<B, A, Object> withContext =
    source()
        .asSourceWithContext(a -> a)  // make the incoming A the context
       .map(toB)  // ...and the B the element

Source<A, NotUsed> =
    withContext
        .map(b -> doStuffWithB())
        .asSource()
        .map(Pair::second)

The Pairing of A and B within the SourceWithContext is hidden; since the common use-case for this will have restrictions on reordering of elements, none of the operations which can reorder elements are directly supported: you can use a via to a flow from pairs to pairs and handle context propagation yourself.

That’s I’m looking for! Thank you!

1 Like