Prevent Lagom Kafka consumer restart

(Marc-Antoine Nüssli) #1


I have a service which listens to several kafka topics and forward the messages through a general processing stream that is common to all kafka consumer. Currently, I am achieving this using dynamic fanIn/fanOut stages (MergeHub/BroadcastHub) which in the end, provides me with a single Flow that I could pass to several topic subscriber(as argument to atLeastOnce) and which will direct all messages to the unique common stream.

However I have some issue to make the whole system to automatically restart correctly. The problem is that the main processing stream may fail in which case the Flow registered when atLeastOnce were called, will not be valid anymore. Hence, the Lagom subscribers will indefinitely try to restart their stream with an always-failing Flow.
More specifically, when my main stream fails (which will also make each kafka subscriber stream fail), I restart it and get a new Flow which I would need to provide to the kafka subscriber. So, I need to call atLeastOnce again on each kafka subscriber with the new Flow.

It seems this solution would work well but for this, I would need to switch off the BackoffSupervisor used by the lagom kafka subscriber. By looking at the doc and ScaladslKafkaSubscriber code, it looks like it would not be possible to prevent this auto-restart behavior (although lagom doc suggests it is only a default behavior)
I know I could do it using directly alpakka-kafka but I would prefer to avoid it if I could to keep a cleaner code.

Does someone knows if there is some ways to effectively prevent this kafka subscriber restart, e.g. by playing with the backoff parameters ?

Thanks in advance for any help or suggestion!