Alpakka AMQP multiple consumers

(Toni Andreev) #1

Hi,

I am trying to setup multiple consumers from a Qpid queue using Alpakka AMQP (version M1 at the moment). I use an AmqpCachedConnectionProvider, so I can reuse the connection (internally I create a AmqpDetailsConnectionProvider). My understanding is that each Source is going to create its own channel with its own buffer and then via source.merge, everything will come together. Obtaining a source looks something like the below:

override def getQueueSources(queueName: String,
                             consumerCount: Int,
                             bufferSize: Int): Source[ByteString, NotUsed] = {
    (0 until consumerCount).foldLeft(Source.empty[IncomingMessage]) {
      case (source, _) => source.merge(getRestartingAmqpQueueSource(queueName, bufferSize))
    }.via(incomingMessageToByteString)
}

def getRestartingAmqpQueueSource(queueName: String, bufferSize: Int): Source[IncomingMessage, NotUsed] = {
    RestartSource.withBackoff(
      minBackoff = 5.seconds,
      maxBackoff = 60.seconds,
      randomFactor = 0.2 // adds 20% "noise" to vary the intervals slightly
    ) {
      () => getAmqpQueueSource(queueName, bufferSize)
    }
}

def getAmqpQueueSource(queueName: String, bufferSize: Int): Source[IncomingMessage, NotUsed] = {
    val queueDeclaration = QueueDeclaration(queueName)
    AmqpSource.atMostOnceSource(
      NamedQueueSourceSettings(connectionProvider, queueName).withDeclaration(queueDeclaration),
      bufferSize = bufferSize
    )
}

Now, this kind of approach, no matter the number of consumers, only yields 130m/s in total. It’s worth noting that initially all of the consumers prefetch the number of messages I want them to prefetch (e.g 50 per consumer) but then they start to settle and the total consumption drops to 130m/s.

Initially, I thought that it’s backpressure but it isn’t since when I changed the AmqpCachedConnectionProvider to AmqpDetailsConnectionProvider directly (so 1 connection per consumer rather than 1 channel per consumer), I get 750m/s.

Any ideas why using channels behaves in this way? I would prefer to rely on channels rather than multiple connections since channels are more lightweight.

Thanks!