How to serialize materialization of `concat`enated sources?


(Denis Mikhaylov) #1

So I have a Source of Committable[A] elements

case class Committable[A](commit: () => Future[Unit], value: A)
val source: Source[Committable[A], NotUsed] = ...

and I want to transform it into Source that runs commit after successful processing of value by downstream.

The naive implementation (that works with fs2 btw) is:

def autocommit[A](in: Source[Committable[A], NotUsed]): Source[A, NotUsed]  =
  in.flatMapConcat { c =>
    val commit = Source.single(())
      .mapAsync(_ => c.commit())
      .flatMapConcat(_ => Source.empty[A])
    Source.single(c.value).concat(commit)
  }

The problem is that c.commit() is called regardless of how c.value is processed by downstream. So the following code would commit:

autocommit(source).mapAsync(1)(_ => Future.failed(new RuntimeException))

As I understand from docs this is kinda expected behaviour due to materialization and buffering.

So is it possible to achieve what I want?


(Johannes Rudolph) #2

In akka-stream, a streaming pipeline is run asynchronously, so after an element has been sent to downstream you don’t get any feedback about what the downstream did with it. So, your approach seems to be doomed.

Instead, what you need to do is keep the Committable until after the processing of its value. This is somewhat hard to do because you will have to adapt every part of your processing pipeline to carry the Committable along.

So far, we have shied away from providing a general solution for the use case of transparently carrying along context of an element. One reason is that not every stream operation will support this because in general a Flow doesn’t guarantee a one to one mapping between input and output elements (and then what do you do about any graph stages with multiple inputs or outputs).

We realize that this is a real issue for some kinds of applications. A while ago I started playing around with providing a specialized SourceWithContext that would offer automatic context propagation - for a limited set of stream operations. If we decide to go with that I guess this could help you in the future.


(Denis Mikhaylov) #3

I see. Thank you for the reply. Really appreciate it :slight_smile: