Problem with backPressure with Amqp.commitableSource

Hi everyone,

I have an akka stream that reads from a rabbit and publish events to the same rabbit without acking. However, the process inside the stream needs warmup to become more effective (cache loading), and it cannot be done before the stream starts so events get buffered.

While testing, I set the buffer size really high and I didn’t have any problem. But when I put it to a more realistic value, it failed with [ERROR] 2019-04-09 18:34:23.284 [alpakka-akka.actor.default-dispatcher-2] akka.event.slf4j.Slf4jLogger$$anonfun$receive$1 - [error-sink] Upstream failed. java.lang.RuntimeException: Reached maximum buffer size 10000
This is bad because it makes everything crash silently, bypassing the supervisor that do not restart the stream, but this is another thing for another time.

From my understanding, akka stream has a backpressure mechanism and I thought it applied to alpakka amqp as well.

Am I missing something on the rabbitMq implementation or is this not implemented?

Hi,

the Alpakka Amqp connector does call channel.basicQos(bufferSize) which should limit prefetching to bufferSize elements. Can you reproduce this error in a smaller example? I think this deserves a ticket in Alpakka issue tracker. Would you mind creating one?

Thank you for your response.

I Will try to reproduce with a smaller example this week. Will tell you if I can’t.

I am facing the same issue with Alpakka amqp. I am testing a scenario where a stream reads messages from AMQP and processes it further.

It may occur that amqp keeps receiving messages while the application was down. When the application is restarted and starts consuming messages from amqp , the queue might already have 100s of messages waiting to be processed.

If I set the bufferSize to a small number I get the error at the start up - java.lang.RuntimeException: Reached maximum buffer size 100

Setting it to a really high number fixes the issue, is there any other deterministic way to configure the stream so that this error doesn’t occur with a reasonable buffer size.