I’m trying implement stream where:
- messages are read from init topic
- each message is processed (business logic part)
- based on processing result optionally publish message
- commit original message
In code it looks like below:
messagesSource .mapAsync(parallelismLevel, this::processMessage) // .log("after-message-processing", loggingAdapter) .via(Producer.flexiFlow(producerSettings)) .map(ProducerMessage.Results::passThrough) .toMat(createCommittingSink(100L), Keep.both()) .mapMaterializedValue(Consumer::createDrainingControl) // .run(materializer);
As processing message require communication with external services I would like slow down processing in case when upstream service couldn’t handle traffic.
Assuming that in
this::processMessage I’m calling service A via HTTP and it can return e.g. 429 Too Many Requests that will result in
PleaseSlowDownException is there a way to say downstream (in this case Kafka consumer source) to slow down?
Thanks for help in advance!