Slowing down processing on business errors

Hello,

I’m trying implement stream where:

  • messages are read from init topic
  • each message is processed (business logic part)
  • based on processing result optionally publish message
  • commit original message

In code it looks like below:

messagesSource
				.mapAsync(parallelismLevel, this::processMessage) //
				.log("after-message-processing", loggingAdapter)
				.via(Producer.flexiFlow(producerSettings))
				.map(ProducerMessage.Results::passThrough)
				.toMat(createCommittingSink(100L), Keep.both())
				.mapMaterializedValue(Consumer::createDrainingControl) //
				.run(materializer);

As processing message require communication with external services I would like slow down processing in case when upstream service couldn’t handle traffic.

Assuming that in this::processMessage I’m calling service A via HTTP and it can return e.g. 429 Too Many Requests that will result in PleaseSlowDownException is there a way to say downstream (in this case Kafka consumer source) to slow down?

Thanks for help in advance!

Hi @sebarys,

Kafka is not pushing data into the stream, it’s your stream that’s generating demand.

If the problem arises because you have too many requests in flight on the downstream service, I think the issue can be fixed tunning the value of parallelismLevel. If you allow too many this::processMessage to run in parallel, the downstream API may complain.

If the 429 Too Many Requests is a quota error (more than N messages per minute) then you could use a rate control operator from the Akka Streams base library. I think throttle would do the trick.

Cheers,

Hi @ignasi35

To be more precise: on each element processed in mapAsync I expect messageSource will receive pull request am I right?
What I want to achieve is to not send pull signal to messageSource if message wasn’t processed properly.

Unfortunately there is no fixed quota how much external service can handle, I would like adapt throughput to current possibilities and in case of domain errors have a way to say please slow down to messageSource. Is it sth like this possible?

From how I currently understand using e.g. Supervision.getResumingDecider or any other supervision strategy will not cause slowing down stream and I don’t have an idea how differently I can handle exception in mapAsync to receive desired behaviour.

You rarely start with a system that is reactive from start to end so I’m wondering how you will handle such communication with other application to adapt throughput based on its responses.

The closest I can think of is the Valve example in akka-stream-contrib. It provides you with a handle to pause a stream from the outside.

That might give you inspiration how to approach your idea.

Cheers,
Enno.

Thanks! I will look on it