Kafka Draining Control with restart mechanism

alpakka
kafka
(Sigurd Sippel) #1

Hey guys,

I’m working with alpakka (slick,cassandra,kafka).
For consuming a Kafka topic I use the Draining Control combined with the new Committer Sink.
https://doc.akka.io/docs/alpakka-kafka/current/consumer.html#committer-sink

Thats nice, but I’m unsure how I can now react (restart) on errors that probably after not existing anymore after a restart (connection problems, …).
I would like to use the RestartSource but a DrainingControl is not a Source or CompletionStage.

Whats the trick? :smiley:

Best
Sigurd

(Enno) #2

Hi Sigurd,

Sorry for not answering earlier.

You’re right, there is no good way to get the materialized value out of the RestartSource. One way of getting access to the latest materialized control is to have an AtomicReference which is populated from the inner stream creation logic in mapMaterializedValue as shown in the docs: https://doc.akka.io/docs/alpakka-kafka/current/errorhandling.html

I hope that helps,
Enno.

(Sigurd Sippel) #3

Thank you for that hint, thats interesting.

I currently use a decider function to restart the source.

akka.japi.function.Function<Throwable, Supervision.Directive> decider = exc -> {
            if (exc instanceof CoordinatorNotAvailableException) {
                return Supervision.restart();
            }
            else if (exc instanceof RetriableException){
                return Supervision.restart();
            }
            else if (exc instanceof java.util.concurrent.TimeoutException){
                return Supervision.restart();
            }
            else{
                return Supervision.stop();
            }
        };

And additionally I add a shut down hook with a CoordinatedShutdown which terminates the Draining Control in phase service-unbind (with phase default configuration).
I’m little bit unsure which phase is the best. I get now failed graph stages on shut down.