Hi,
I am trying to setup multiple consumers from a Qpid queue using Alpakka AMQP (version M1 at the moment). I use an AmqpCachedConnectionProvider, so I can reuse the connection (internally I create a AmqpDetailsConnectionProvider). My understanding is that each Source is going to create its own channel with its own buffer and then via source.merge, everything will come together. Obtaining a source looks something like the below:
override def getQueueSources(queueName: String,
consumerCount: Int,
bufferSize: Int): Source[ByteString, NotUsed] = {
(0 until consumerCount).foldLeft(Source.empty[IncomingMessage]) {
case (source, _) => source.merge(getRestartingAmqpQueueSource(queueName, bufferSize))
}.via(incomingMessageToByteString)
}
def getRestartingAmqpQueueSource(queueName: String, bufferSize: Int): Source[IncomingMessage, NotUsed] = {
RestartSource.withBackoff(
minBackoff = 5.seconds,
maxBackoff = 60.seconds,
randomFactor = 0.2 // adds 20% "noise" to vary the intervals slightly
) {
() => getAmqpQueueSource(queueName, bufferSize)
}
}
def getAmqpQueueSource(queueName: String, bufferSize: Int): Source[IncomingMessage, NotUsed] = {
val queueDeclaration = QueueDeclaration(queueName)
AmqpSource.atMostOnceSource(
NamedQueueSourceSettings(connectionProvider, queueName).withDeclaration(queueDeclaration),
bufferSize = bufferSize
)
}
Now, this kind of approach, no matter the number of consumers, only yields 130m/s in total. It’s worth noting that initially all of the consumers prefetch the number of messages I want them to prefetch (e.g 50 per consumer) but then they start to settle and the total consumption drops to 130m/s.
Initially, I thought that it’s backpressure but it isn’t since when I changed the AmqpCachedConnectionProvider to AmqpDetailsConnectionProvider directly (so 1 connection per consumer rather than 1 channel per consumer), I get 750m/s.
Any ideas why using channels behaves in this way? I would prefer to rely on channels rather than multiple connections since channels are more lightweight.
Thanks!