Kafka - Ability to react on message publication result

Hello everyone,

We’ve tried to use alpakka kafka library (https://doc.akka.io/docs/alpakka-kafka/current/home.html) to implement following scenario:

  • some application parts must send messages to Kafka topic (domain events)
  • each place which request message publication to Kafka should be notified either about publication success or failure and handle both cases properly - as publication is I/O operation and we don’t want block ideally it would be to receive CompletableFuture, interface of such request could be:
CompletableFuture<KafkaProducerResult> publishMessageToKafka(final DomainEvent domainEvent)

Based on existing components we designed following stream:

Source.<Tuple2<DomainEvent, CompletableFuture<KafkaProducerResult>>>queue(QUEUE_BUFFER_SIZE, OverflowStrategy.backpressure())
                .map(tuple -> mapToProducerMessage(topicName, tuple))
                .via(Producer.flexiFlow(kafkaProducerSettings)) 
                .map(result -> {
                    final CompletableFuture<KafkaProducerResult> completableFuture = result.passThrough();
                    return completableFuture.complete(new KafkaProducerResult(KafkaProducerResult.Result.PUBLISHED));
                })
                .to(Sink.ignore())
                .run(system);

Such solution works well in happy path scenarios, but doesn’t allow to complete CompletableFuture in case of publication failures.
Is it possible somehow to receive from Producer.flexiFlow ( or any other producer component available in library ) result of publication - including publication failures with provided PassThrough element?
I think that presented use case is very common for production usages where we want handle both possible results of message publication.

Thank you in advance for any help and suggestions! :slight_smile:

Another question: is there any way to recover without completing stream and having possibility to transform element that failed?
I would like convert exception from Producer.flexiFlow to value that can be passed as next stream element.

  • using recover allow to transform exception to value but it complete the stream
  • using Supervision Strategies allow me to Resume on exception but I’m not able emit any value for problematic element - it will be simply dropped

It both cases I’m loosing PassThrough value

Hi @sebarys,

Is it possible somehow to receive from Producer.flexiFlow ( or any other producer component available in library ) result of publication - including publication failures with provided PassThrough element?

Another question: is there any way to recover without completing stream and having possibility to transform element that failed?

If an exception is thrown by the Alpakka Kafka producer stage (i.e. Producer.flexiFlow) then there is little that can be done to recover from it. These exceptions should only be related to Kafka Producer serialization issues or connection problems. The general advice is to just let the stream fail in these cases and let a RestartSource bring it back to life, or let the process crash entirely and have some other service reschedule it, such as the Kubernetes pod scheduler if this were running on Kubernetes. For an example using RestartSource check out the Error Handling page of the Alpakka Kafka docs.

If an exception occurs in other operators that you have control over and you would like to recover within the stream then I would recommend implementing your own exception handling within that stage yourself. For example, within a map operator wrap your logic in a Try and handle it immediately or emit the Success or Failure result downstream as an element.

I would like convert exception from Producer.flexiFlow to value that can be passed as next stream element.

In your code example you’re wrapping the result in a CompletableFuture, but this map operator will always return a successful future. If an unhandled exception occurs in the stream then the Future returned by the .run operation will return the Failure.

Also keep in mind that errors/exceptions in streams propagate out-of-band from elements. It’s possible for an error to arrive downstream before elements that were emitted before the element that caused it. Therefore it’s never a good idea to recover the stream (i.e. with a different source with recoverWith) if you want to make sure you don’t lose messages.

Hi @seglo

Thanks for reply!

The problem I see with restarting - RestartFlow around Producer.flexiFlow(kafkaProducerSettings) in this case could be described by following example:

  • I have 3 messages that should be published to Kafka in SourceQueue
  • after publishing 1st message I lost connection to Kafka
  • 2nd message processing cause error e.g. org.apache.kafka.common.errors.TimeoutException
  • Flow will be restarted but I’m loosing 2nd message and don’t have possibility to notify other parts of application using CompletableFuture::complete on PassThrough element attached to this message

As Producer.flexiFlow is swallowing PassThrough element in case of failure I’m not able to react in case of any problems for messages that are processing in this moment.

I was wondering if Kafka Producer related exceptions shouldn’t result in

ProducerMessage.PulbicationFailedResult(ex: Throwable, failedElement: Envelope[K, V, PassThrough]) extends Results[K, V, PassThrough]

This would allow interpret publication related exceptions and handle them accordingly.

Hi @sebarys,

I see. Instead of using RestartFlow I think you should let the stream fail and then on recovery read the last message produced to Kafka and use that to determine what to populate your SourceQueue with on startup. If you don’t want to read from Kafka then you could do your own book keeping by tracking what source messages have been successfully produced by tracking the last successful message sent downstream from the producer, but in a producer failure scenario there’s no guarantee that a message was not written to the broker before the client shuts down or receives an acknowledgement, unless you’re using Kafka Transactions.

Ultimately these committing semantics are best handled by consuming from and producing to topics on the same Kafka cluster. Then source messages will contain an offset that can be used to acknowledge that a message was successfully processed (i.e. processed and produced to a topic) using the Committer.flow or Producer.committableSink (or their variants). In use cases like yours it’s your responsibility to dedupe what is sent to Kafka, or dedupe later in your workflow, or just tolerate processing duplicates.

Here’s an example of an at least once committing workflow I discussed earlier: https://doc.akka.io/docs/alpakka-kafka/current/atleastonce.html

Regards,
Sean

Thanks @seglo

Unfortunately your hints doesn’t match my use case. Maybe more context will give you better understanding:
We would like to provide component that will allow publish notification to kafka for our web applications. Each HTTP request to webserver/result of consumption message from RabbitMQ/Kafka can result in notification that should be published.
As we have some experience in using Akka and think that akka streams with its built in backpressurre could benefits here, we’ve decided to give a try and use it as library under this kafka publisher component.
All applications are JVM based but use different technologies: RxJava, Reactor, Akka HTTP, Vert.x so communication between publisher component and other parts of applications must be done via common interface - CompletableFuture in this case.

As you can see I don’t have fixed number of messages that can be added to SourceQueue on start - elements are added dynamically and there is no way to re-populate SourceQueue in case of stream failure.

Pseudo code of what we would like to achieve:

sourceQueue
                .map(msgToPublish -> convertToProducerMsg(topicName, msgToPublish))
                .via(Producer.flexiFlow(kafkaProducerSettings)) 
                .map(result -> {
                    if(result.isSuccess()) {
                        return result.passThrough().complete(KafkaProducerResult.PUBLISHED);
                    } else {
                        LOG.error("Error during publication {} to kafka. Exception: {}", result.originalMsg(), result.failureCause())
                        return result.passThrough().complete(KafkaProducerResult.FAILED);
                    }
                })
                .to(Sink.ignore())
                .run(system);

Unfortunately Producer.flexiFlow doesn’t have possibility to produce element like:

ProducerMessage.PulbicationFailedResult(ex: Throwable, failedElement: Envelope[K, V, PassThrough]) extends Results[K, V, PassThrough]

to react not only on publication success, but also failures.

As stream preserve order we figured out hacky solution:

sourceQueue
    .mapConcat(msgToPublishWithCompletableFuture -> {
        return Arrays.asList( //
            ProducerMessage.single(new ProducerRecord<>(topicName, msgToPublishWithCompletableFuture.getMessageToPublish()), msgToPublishWithCompletableFuture),
            ProducerMessage.passThrough(msgToPublishWithCompletableFuture)
        );
    })
    .via(Producer.flexiFlow(kafkaProducerSettings)) 
    .fold(Optional.empty(), (state, element) -> {
        // publication succeeded and state is empty -> adding successful publication passthrough value as state
        // and waiting for associated ProducerMessage.PassThroughMessage
        if (element instanceof ProducerMessage.Result && state.isEmpty()) {
            return Optional.of(element.passThrough());
        }
        // received passthorugh element associated with state value -> publication succeeded
        else if (element instanceof ProducerMessage.PassThroughResult &&
                state.isPresent() &&
                state.get().equals(element.passThrough()))
        {
            element.passThrough().getPublicationResultFuture().complete(PublicationResult.PUBLISHED);
            return Optional.empty();
        }
        // received passthrough element when state is empty -> publication of associated message failed
        else if (element instanceof ProducerMessage.PassThroughResult && state.isEmpty())
        {
            LOG.warn("[PUBLICATION FAILED] Received {}, when state is empty - publication associated with existing state failed", //
                    ((ProducerMessage.PassThroughResult) element).passThrough());
            element.passThrough().getPublicationResultFuture().complete(PublicationResult.FAILED);
            return Optional.empty();
        }
        // as we're first receiving ProducerMessage.Message then ProducerMessage.PassThroughMessage
        // and stream preserve order other cases are not possible
        else
        {
            LOG.error("[CRITICAL] Illegal state of stream: Received unexpected {} element when state was {}", element, state);
            return Optional.empty();
        }
    })
    .to(Sink.ignore())
    .run(system);

Like I wrote above it looks a bit hacky for me and maybe it would be worth considering adding the Producer.flexiFlow variant which produce element both on publication success and failure (if Kafka Producer related exceptions).

Hi @sebarys,

You’re right, that is not easily achievable with Alpakka Kafka today. Akka Streams, in a way, inherit their approach to error handling from Akka Actors: Once you hit an error, better start over from a well known state than trying to fix it. That is why errors in most cases don’t get passed as regular stream elements, but travel in the streams’ infrastructure.

Some authors of Alpakka connectors decided to make the errors first-class, instead. With that it becomes the responsibility of the programmers to explicitly check if an error occurred.

Alpakka Kafka does not allow that. I believe support for it could be added, but that would require another implementation of Results which would break backwards-compatibility.

Your use case could make use of the non-streaming helper I merged today. It gives you a CompletionStage for every send. But you lose the backpressure that Akka Streams give you and the producer’s capability to batch the sending of messages when you sequence the futures properly.

See https://github.com/akka/alpakka-kafka/pull/1085

Cheers,
Enno.

1 Like

Thanks for information about this helper!

For now we have this “hacky” working solution that support backpresurre but defenietly it is somehting that will be useful for testing or integration with non-streaming exisitng applications.

I think it could be done without breaking changes by adding Producer.flow (don’t want to think about name now) that return Either[ProducerMessage.PulbicationFailure, ProducerMessage.Results]
or have another trait like

sealed trait ResultsWithError[K, V, PassThrough] {
    def passThrough: PassThrough
  }

That could be extended both by ProducerMessage.Results and ProducerMessage.PublicationFailure
but it would be good to first check how it is done in other Alpakka connectors to don’t introduce sth that handle emitted error result in different way.

1 Like

I’ve created feature request on github project described this idea https://github.com/akka/alpakka-kafka/issues/1101

2 Likes