Throttle stream based on external input

This is a question I’ve asked previously in Stack Overflow


Looking at the signature of throttle in Akka Streams I see that it can take a cost function of type (Out) ⇒ Int to compute how the element passing through the stage affects the cost/per speed limit of elements sent downstream.

Is it possible to implement throttling based on external input for the stream with a built-in stage? If not, would it be easier just to implement a custom stage?

I’m expecting something like:

def throttle(cost: Int, per: FiniteDuration, maximumBurst: Int, costCalculation: ⇒ Int, mode: ThrottleMode): Repr[Out]

So that we could do something like:

throttle(75 /** 75% CPU **/, 1.second, 80 /** 80% CPU **/, getSinkCPU, ThrottleMode.Shaping) 

What about taking Source.tick for periodic elements and then combining that with throttle, but implementing costCalculation to refer to some external input to calculate the cost. That would produce a stream of ticks that is throttled by some external input. This stream then can be zipped with some other stream that needs to be throttled.

Thanks for the reply @2m. I’m not quite sure what you mean by combining Source.tick and throttle. The costCalculation would still be per element passing through this combined stream though?

Do you have an example of how that might look like?

Yes, costCalculation would be per element. But you need some kind of granularity for checking the current CPU usage.

Do you see any problem in implementing costCalculation by checking CPU usage instead of calculating cost from an element?

I don’t, but I was hoping that the defined granularity would not dictate the max rate of elements passing through the stream. Maybe I’m mistaken, but I was under the impression that combining 2 streams would make the stream progress at the rate of the slowest stream.

I’m guessing I would have to use something like conflate for making tick stream catch up if it has to?

Yes, conflate can be used to make faster upstream continue if the slower downstream can not accept elements.

There is also alsoTo and wireTap which allows sending all elements from one flow to another regarding or disregarding the backpressure accordingly.

Can you elaborate your use-case a bit more?