Problem with backPressure with Amqp.commitableSource

(Blaye Nicolas) #1

Hi everyone,

I have an akka stream that reads from a rabbit and publish events to the same rabbit without acking. However, the process inside the stream needs warmup to become more effective (cache loading), and it cannot be done before the stream starts so events get buffered.

While testing, I set the buffer size really high and I didn’t have any problem. But when I put it to a more realistic value, it failed with [ERROR] 2019-04-09 18:34:23.284 [alpakka-akka.actor.default-dispatcher-2] akka.event.slf4j.Slf4jLogger$$anonfun$receive$1 - [error-sink] Upstream failed. java.lang.RuntimeException: Reached maximum buffer size 10000
This is bad because it makes everything crash silently, bypassing the supervisor that do not restart the stream, but this is another thing for another time.

From my understanding, akka stream has a backpressure mechanism and I thought it applied to alpakka amqp as well.

Am I missing something on the rabbitMq implementation or is this not implemented?

(Martynas Mickevičius) #2

Hi,

the Alpakka Amqp connector does call channel.basicQos(bufferSize) which should limit prefetching to bufferSize elements. Can you reproduce this error in a smaller example? I think this deserves a ticket in Alpakka issue tracker. Would you mind creating one?

(Blaye Nicolas) #3

Thank you for your response.

I Will try to reproduce with a smaller example this week. Will tell you if I can’t.