GraphStage complete and get notification of downstream completion

Hi!

I have a graph that I create with the GraphDSL in java.

The structure of the graph is something like this:

(S1) (S2) ... (SN)                (SourceQueue)
         V V V                          V
    (connect to merge).                 V (via)
               V             StreamCompleter GraphStage
                   V             V (connect to merge preferred)
                  (Merge Preferred)
                           V
               ProcessorStage GraphStage
                           V
                  Producer.plainSink

Where S1 -> SN are RabbitMq sources, and the ProcessorStage graph stage outputs ProducerRecords.

I create the MergePreferred with eagerComplete = true.

The StreamCompleter is a graph stage, which is basically a KillSwitch.
It receives a message via a SourceQueue, and completes the stream, not before sending a piece of data to another place in the app.

What I want to be able to do, is to only send this data after the kafka plainSink has completed.

I refactored my code a bit to send the data in GraphStageLogic#postStop,
But I’m not sure this gives me the guarantee that the kafka sink has flushed all the records it had.

I’m not that good at Akka, as this is a pretty complicated matter and we don’t do much of it in our company, not enough to know the specifics.

I would appreciate some help with understanding the lifecycle methods involved here,
f.e: when exactly is postStop called compare to the kafka sink completing itself?

Thanks for helping, be safe and healthy!

Hi @gioragutt,

the most conservative view is to see every stage as an independently executing, asynchronous component, Often observed sequences of events (especially with regard to completion) might just be an implementation detail and cannot be generally relied on.

plainSink materialized to a CompletionStage<Done> that can be used to determine that the stage has completed. That would be the most reliable way to detecting that. Might still be a bit complicated to feed that to the right place.

Johannes

1 Like

Hi, @gioragutt

As far as i know the graph proccess works like “stages of processing” see https://doc.akka.io/docs/akka/current/stream/stream-flows-and-basics.html#operator-fusion to know more about that.

Another important thing to know using akka stream is the reactivity of a stream. The source will only send data if the sink can proccess and the overflow data can be handle in many ways, see https://doc.akka.io/docs/akka/current/stream/stream-flows-and-basics.html#back-pressure-explained =D.

I hope this can be helpfull

Regards.

1 Like

Hi @jrudolph, thanks for your helpful comment!

I’ve experimented with your comment about the CompletionStage<Done>.

In my GraphDSL builder, we pass the plainSink and a SourceQueue.
Previously, we used Keep.right() to get the materialized value of the SourceQueue to get the SourceQueueWithComplete to kill the graph.

What we did now is change Keep.right() to Keep.both(), which now returns the SourceQueueWithComplete and the CompletionStage<Done> in a pair.

I later use that completion stage to trigger my custom “shutdown” logic.

Does it seem right? Am I 100% sure that when the whenComplete callback of the plainSink's CompletionStage is called, it will no longer send ANY messages to kafka?

If so, you’ve been most helpful!

Just a fun fact, as a person who’s dealing a lot with asynchronousy in javascript, having had to use a CompleteableFuture<CompletableFuture<Done>> (in order to be able to pass the plainSink completion stage to the StreamCompleter graph stage logic), made sense to me, but I’m sure it still buffles some of my co-workers :sweat_smile:

Hi @gioragutt,

Does it seem right? Am I 100% sure that when the whenComplete callback of the plainSink 's CompletionStage is called, it will no longer send ANY messages to kafka?

As soon as Alpakka Kafka producer stages receive a shutdown signal in the form of upstream complete, cancellation, or error, no more elements are pushed to the producer stage and nothing else will be produced. This is a property of Akka Streams itself. If you look at the documentation for signal callbacks in Inlets and Outlets in custom graph stages (onUpstreamFinished, onDownstreamFinished, etc) it states that:

After this callback no other callbacks will be called for this port.

The Alpakka Kafka producer stage will attempt to produce elements as soon as they arrive, so when they stop, producing stops.

If you were using Alpakka Kafka sources together with Alpakka Kafka sinks and use Kafka to manage offsets then the ideal shutdown process is a little more involved. You can read about how this process is managed by using the DrainingControl in Alpakka Kafka, but I don’t think this is applicable to your use case because the messages you’re producing are not provided by an Alpakka source.

Regards,
Sean