Kafka - Ability to react on message publication result

Hello everyone,

We’ve tried to use alpakka kafka library (https://doc.akka.io/docs/alpakka-kafka/current/home.html) to implement following scenario:

  • some application parts must send messages to Kafka topic (domain events)
  • each place which request message publication to Kafka should be notified either about publication success or failure and handle both cases properly - as publication is I/O operation and we don’t want block ideally it would be to receive CompletableFuture, interface of such request could be:
CompletableFuture<KafkaProducerResult> publishMessageToKafka(final DomainEvent domainEvent)

Based on existing components we designed following stream:

Source.<Tuple2<DomainEvent, CompletableFuture<KafkaProducerResult>>>queue(QUEUE_BUFFER_SIZE, OverflowStrategy.backpressure())
                .map(tuple -> mapToProducerMessage(topicName, tuple))
                .via(Producer.flexiFlow(kafkaProducerSettings)) 
                .map(result -> {
                    final CompletableFuture<KafkaProducerResult> completableFuture = result.passThrough();
                    return completableFuture.complete(new KafkaProducerResult(KafkaProducerResult.Result.PUBLISHED));
                })
                .to(Sink.ignore())
                .run(system);

Such solution works well in happy path scenarios, but doesn’t allow to complete CompletableFuture in case of publication failures.
Is it possible somehow to receive from Producer.flexiFlow ( or any other producer component available in library ) result of publication - including publication failures with provided PassThrough element?
I think that presented use case is very common for production usages where we want handle both possible results of message publication.

Thank you in advance for any help and suggestions! :slight_smile:

Another question: is there any way to recover without completing stream and having possibility to transform element that failed?
I would like convert exception from Producer.flexiFlow to value that can be passed as next stream element.

  • using recover allow to transform exception to value but it complete the stream
  • using Supervision Strategies allow me to Resume on exception but I’m not able emit any value for problematic element - it will be simply dropped

It both cases I’m loosing PassThrough value

Hi @sebarys,

Is it possible somehow to receive from Producer.flexiFlow ( or any other producer component available in library ) result of publication - including publication failures with provided PassThrough element?

Another question: is there any way to recover without completing stream and having possibility to transform element that failed?

If an exception is thrown by the Alpakka Kafka producer stage (i.e. Producer.flexiFlow) then there is little that can be done to recover from it. These exceptions should only be related to Kafka Producer serialization issues or connection problems. The general advice is to just let the stream fail in these cases and let a RestartSource bring it back to life, or let the process crash entirely and have some other service reschedule it, such as the Kubernetes pod scheduler if this were running on Kubernetes. For an example using RestartSource check out the Error Handling page of the Alpakka Kafka docs.

If an exception occurs in other operators that you have control over and you would like to recover within the stream then I would recommend implementing your own exception handling within that stage yourself. For example, within a map operator wrap your logic in a Try and handle it immediately or emit the Success or Failure result downstream as an element.

I would like convert exception from Producer.flexiFlow to value that can be passed as next stream element.

In your code example you’re wrapping the result in a CompletableFuture, but this map operator will always return a successful future. If an unhandled exception occurs in the stream then the Future returned by the .run operation will return the Failure.

Also keep in mind that errors/exceptions in streams propagate out-of-band from elements. It’s possible for an error to arrive downstream before elements that were emitted before the element that caused it. Therefore it’s never a good idea to recover the stream (i.e. with a different source with recoverWith) if you want to make sure you don’t lose messages.

Hi @seglo

Thanks for reply!

The problem I see with restarting - RestartFlow around Producer.flexiFlow(kafkaProducerSettings) in this case could be described by following example:

  • I have 3 messages that should be published to Kafka in SourceQueue
  • after publishing 1st message I lost connection to Kafka
  • 2nd message processing cause error e.g. org.apache.kafka.common.errors.TimeoutException
  • Flow will be restarted but I’m loosing 2nd message and don’t have possibility to notify other parts of application using CompletableFuture::complete on PassThrough element attached to this message

As Producer.flexiFlow is swallowing PassThrough element in case of failure I’m not able to react in case of any problems for messages that are processing in this moment.

I was wondering if Kafka Producer related exceptions shouldn’t result in

ProducerMessage.PulbicationFailedResult(ex: Throwable, failedElement: Envelope[K, V, PassThrough]) extends Results[K, V, PassThrough]

This would allow interpret publication related exceptions and handle them accordingly.

Hi @sebarys,

I see. Instead of using RestartFlow I think you should let the stream fail and then on recovery read the last message produced to Kafka and use that to determine what to populate your SourceQueue with on startup. If you don’t want to read from Kafka then you could do your own book keeping by tracking what source messages have been successfully produced by tracking the last successful message sent downstream from the producer, but in a producer failure scenario there’s no guarantee that a message was not written to the broker before the client shuts down or receives an acknowledgement, unless you’re using Kafka Transactions.

Ultimately these committing semantics are best handled by consuming from and producing to topics on the same Kafka cluster. Then source messages will contain an offset that can be used to acknowledge that a message was successfully processed (i.e. processed and produced to a topic) using the Committer.flow or Producer.committableSink (or their variants). In use cases like yours it’s your responsibility to dedupe what is sent to Kafka, or dedupe later in your workflow, or just tolerate processing duplicates.

Here’s an example of an at least once committing workflow I discussed earlier: https://doc.akka.io/docs/alpakka-kafka/current/atleastonce.html

Regards,
Sean

Thanks @seglo

Unfortunately your hints doesn’t match my use case. Maybe more context will give you better understanding:
We would like to provide component that will allow publish notification to kafka for our web applications. Each HTTP request to webserver/result of consumption message from RabbitMQ/Kafka can result in notification that should be published.
As we have some experience in using Akka and think that akka streams with its built in backpressurre could benefits here, we’ve decided to give a try and use it as library under this kafka publisher component.
All applications are JVM based but use different technologies: RxJava, Reactor, Akka HTTP, Vert.x so communication between publisher component and other parts of applications must be done via common interface - CompletableFuture in this case.

As you can see I don’t have fixed number of messages that can be added to SourceQueue on start - elements are added dynamically and there is no way to re-populate SourceQueue in case of stream failure.

Pseudo code of what we would like to achieve:

sourceQueue
                .map(msgToPublish -> convertToProducerMsg(topicName, msgToPublish))
                .via(Producer.flexiFlow(kafkaProducerSettings)) 
                .map(result -> {
                    if(result.isSuccess()) {
                        return result.passThrough().complete(KafkaProducerResult.PUBLISHED);
                    } else {
                        LOG.error("Error during publication {} to kafka. Exception: {}", result.originalMsg(), result.failureCause())
                        return result.passThrough().complete(KafkaProducerResult.FAILED);
                    }
                })
                .to(Sink.ignore())
                .run(system);

Unfortunately Producer.flexiFlow doesn’t have possibility to produce element like:

ProducerMessage.PulbicationFailedResult(ex: Throwable, failedElement: Envelope[K, V, PassThrough]) extends Results[K, V, PassThrough]

to react not only on publication success, but also failures.

As stream preserve order we figured out hacky solution:

sourceQueue
    .mapConcat(msgToPublishWithCompletableFuture -> {
        return Arrays.asList( //
            ProducerMessage.single(new ProducerRecord<>(topicName, msgToPublishWithCompletableFuture.getMessageToPublish()), msgToPublishWithCompletableFuture),
            ProducerMessage.passThrough(msgToPublishWithCompletableFuture)
        );
    })
    .via(Producer.flexiFlow(kafkaProducerSettings)) 
    .fold(Optional.empty(), (state, element) -> {
        // publication succeeded and state is empty -> adding successful publication passthrough value as state
        // and waiting for associated ProducerMessage.PassThroughMessage
        if (element instanceof ProducerMessage.Result && state.isEmpty()) {
            return Optional.of(element.passThrough());
        }
        // received passthorugh element associated with state value -> publication succeeded
        else if (element instanceof ProducerMessage.PassThroughResult &&
                state.isPresent() &&
                state.get().equals(element.passThrough()))
        {
            element.passThrough().getPublicationResultFuture().complete(PublicationResult.PUBLISHED);
            return Optional.empty();
        }
        // received passthrough element when state is empty -> publication of associated message failed
        else if (element instanceof ProducerMessage.PassThroughResult && state.isEmpty())
        {
            LOG.warn("[PUBLICATION FAILED] Received {}, when state is empty - publication associated with existing state failed", //
                    ((ProducerMessage.PassThroughResult) element).passThrough());
            element.passThrough().getPublicationResultFuture().complete(PublicationResult.FAILED);
            return Optional.empty();
        }
        // as we're first receiving ProducerMessage.Message then ProducerMessage.PassThroughMessage
        // and stream preserve order other cases are not possible
        else
        {
            LOG.error("[CRITICAL] Illegal state of stream: Received unexpected {} element when state was {}", element, state);
            return Optional.empty();
        }
    })
    .to(Sink.ignore())
    .run(system);

Like I wrote above it looks a bit hacky for me and maybe it would be worth considering adding the Producer.flexiFlow variant which produce element both on publication success and failure (if Kafka Producer related exceptions).

Hi @sebarys,

You’re right, that is not easily achievable with Alpakka Kafka today. Akka Streams, in a way, inherit their approach to error handling from Akka Actors: Once you hit an error, better start over from a well known state than trying to fix it. That is why errors in most cases don’t get passed as regular stream elements, but travel in the streams’ infrastructure.

Some authors of Alpakka connectors decided to make the errors first-class, instead. With that it becomes the responsibility of the programmers to explicitly check if an error occurred.

Alpakka Kafka does not allow that. I believe support for it could be added, but that would require another implementation of Results which would break backwards-compatibility.

Your use case could make use of the non-streaming helper I merged today. It gives you a CompletionStage for every send. But you lose the backpressure that Akka Streams give you and the producer’s capability to batch the sending of messages when you sequence the futures properly.

See https://github.com/akka/alpakka-kafka/pull/1085

Cheers,
Enno.

1 Like

Thanks for information about this helper!

For now we have this “hacky” working solution that support backpresurre but defenietly it is somehting that will be useful for testing or integration with non-streaming exisitng applications.

I think it could be done without breaking changes by adding Producer.flow (don’t want to think about name now) that return Either[ProducerMessage.PulbicationFailure, ProducerMessage.Results]
or have another trait like

sealed trait ResultsWithError[K, V, PassThrough] {
    def passThrough: PassThrough
  }

That could be extended both by ProducerMessage.Results and ProducerMessage.PublicationFailure
but it would be good to first check how it is done in other Alpakka connectors to don’t introduce sth that handle emitted error result in different way.

1 Like

I’ve created feature request on github project described this idea https://github.com/akka/alpakka-kafka/issues/1101

2 Likes

@sebarys @Ennru,

Sorry for hacking into the mostly closed discussion on this topic. I have a tricky situation where we read files from file queue and each file will have multiple messages. we then store these messages on to Kafka for further use. we used Akka streams with scala to build the pipeline as below
filetickerSource --> file processing(rename,read each message,delete file) --> Kafka Producer Sink(Store each successfully read message)

But now, we have to handle unpublished messages (in case Kafka goes down). the in-flight messages which Kafka producer is unable to publish should get written back to file queue. Somehow, I could manage to Resume stream in case of Kafka TimeOutException but , could not get a handle on failed element which i can re-write back to file queue. this is leading to message loss.

@sebarys,
What I gather from above discussion is, you could somehow, solve it using your hacky solution. just wanted to some more in-sight on it as to

  1. Did you test it for back pressure scenarios
  2. are you still facing an issue of stream termination in case Kafka going down scenarios.

hi @sebatin

  1. In case of Kafka errors stream will be recreated, for your logic errors (e.g. dependency couldn’t handle traffic that you’re producing) you can handle them by adding artificial delay e.g. using delayWith https://doc.akka.io/docs/akka/current/stream/operators/Source-or-Flow/delayWith.html operator
  2. We’re using restarting with backoff to handle stream recreation on failures. Sth similar to:
    import concurrent.duration._
    val restartableProcessingStream = RestartSource
      .withBackoff(minBackoff = 100.milliseconds, maxBackoff = 30.seconds, randomFactor = 0.2) { () =>
        processingStream
          .log("kafka-event-processor")
          .withAttributes(Attributes.logLevels(Logging.DebugLevel, Logging.InfoLevel, Logging.ErrorLevel))
      }

    restartableProcessingStream
      .toMat(Sink.ignore)(Keep.right)
      .mapMaterializedValue(_ => NotUsed)

Hi @sebarys,

Thanks for the explanation very much appreciate it. we were looking at a solution where In case of unavailability of Kafka,re-write already picked up messages by producer, back to the file queue and End/Restart the stream or delay restarts with a back off operator. in both the cases, i don’t find any means of getting handle on messages which are already picked up by the producer. looks like i need to find out some alternative solution until it is supported.

I think it could be doable with two streams in approach described in this thread

  1. First stream
  • has File queue as Source
  • each element is published to kafka publisher stream with attached CompletableFuture
  • interpret processing result (result of CompletableFuture) in case of error publish to File queue
  1. Second stream
  • receive el published by first stream
  • process it
  • publish to Kafka
    – in case of failure complete attached CompletableFuture with failure to notify first stream about failure
    – in case of failure complete attached CompletableFuture with success

It looks bit hacky but in theory I think such solution should work.

Hi @sebarys,

Thanks for your input. I like the idea of two stream approach. i will try to implement it. Since we are using Scala, instead of CompletableFuture, scala Future itself is sufficient is what i believe. Also, let me know if you have any examples of working with multiple streams. Thanks a ton :slight_smile:

Hi @sebarys, @ennru
i have tried the two stream approach (suggested by @sebarys) in order to overcome message loss due to Kafka unavailability. I’m able to handle failed message successfully :slight_smile:
But now, im facing a bigger issue. Back pressure isn’t working for me. Once the Kafka goes down, i expect first stream should not be picking up the messages for processing. can you guys help me where im going wrong? :frowning:

here is the code sanps

implicit val system: ActorSystem = ActorSystem()
val producerSettings = ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
.withBootstrapServers(“localhost:9092”)

// First stream to pick a file,read content & send to Kafka
val source = Source(dataDirectory.list().toList) //
val tickerSource = Source
.tick(1.second, (10000).millis, ())
.mapConcat(_ =>
Paths
.get(“directory_path_to_list_files”)
.toFile
.listFiles
.filter { f => f.isFile && f.getName.endsWith(".txt") }
.toList
)

tickerSource
.flatMapConcat(file => {
FileIO.fromPath(file.toPath)
.map(_.decodeString(“UTF-8”))
.map(s => {
println(s"Read Element:{s}") produceToKafka(Future(s)) .onComplete({ case Success(r) => { println(s"Success Future {r} :${s} “)
}
case Failure(NonFatal(e)) => {
println(s"Something went wrong when reading the element: s :Exception:{e}”)
//re-write to file queue
}
})
})
})
.toMat(Sink.foreach(println))(Keep.right).run()
}

//Stream-2 : Independent stream for sending messages to Kafka
def produceToKafka(msg: Future[String]): Future[Done] = {
Source.future(msg)
.map { elem =>
println(“Elements produced:” + elem);
new ProducerRecord[Array[Byte], String](“test”, elem)
}
.runWith(Producer.plainSink(producerSettings))
}

@sebarys,
Is it possible to share skeleton implementation two streams you have mentioned in your post.
i see you have shared part of a code which does Kafka publishing. i just want to take a look at the other stream which uses this. i want to compare the same with my approach and see why back pressure is not working with my code snippet.Not sure is it something to do with Source type which i’m using in my Kafka Publishing stream.

Hi,

I was using flow mentioned in this thread to integrate with legacy applications written in e.g. RxJava so I don’t have implemented sth that will correspond to your first part.
What I suggested that could theoretically work to simulate sth that will resemble bacpressure is using delayWith https://doc.akka.io/docs/akka/current/stream/operators/Source-or-Flow/delayWith.html that will slow down your first stream in case of errors - if you’re receiving error in next step delay can be applied.

For such feature you would need treat publication as part of first stream logic - produceToKafka should be in mapAsync and you need interpret and pass lower result (map/recover instead of onComplete that is doing sth as side effect). Pseudo code:

.mapAsync(parallelism)(el => {
  produceToKafka(el)
    .map(success => PublicationSuccess(el))
    .recover(... on error convert to PublicationFailure(el) and re-write to file queue
})
.delayWith(
        () =>
          new DelayStrategy[PublicationResult] {
            override def nextDelay(elem: PublicationResult): FiniteDuration =
              //if PublicationSuccess no delay
              //if PublicationFailure apply delay
          },
        DelayOverflowStrategy.dropBuffer)

Above is just an idea that comes to my mind, maybe there is more elegant way to do it but unfortunately I don’t have so broad knowledge about akka streams.

Hi @sebarys,
Thanks a lot for the pseudo code. i will try this and see whether back pressure works. this really helps. :slight_smile: For now, temporarily we have come up with some book keeping solution to track the processed files.(renaming after successful publish to Kafka. Kafka publish failed files any way left with old extension which will be reprocessed later).
It would have been better if it was supported by the Streaming Producer API itself (as you stated earlier) for the cleaner solution. Keeping my fingers crossed on the upcoming releases :slight_smile: