Alpakka AMQP Consuming


Akka-Streams are quite new to me and I wanted to get some hands-on experience. I was thinking of setting up a Qpid broker and building a Producer and Consumer for it.

While looking at Alpakka-AMQP, I noticed that there doesn’t seem to be (at first glance) a way to setup a listener, which will receive messages from the broker once those have been published by the Producer.

I can successfully run the example from the documentation but once I consume the messages, the connection gets closed and the consumer is no longer listening. Specifically, I refer to the example about receiving:

val amqpSource = AmqpSource.atMostOnceSource(
  NamedQueueSourceSettings(connectionProvider, queueName).withDeclaration(queueDeclaration),
  bufferSize = 10

val result = amqpSource.take(5).runWith(Sink.seq)

I am most likely missing something and will appreciate your help.

Thank you for your time!

Hi Toni, the take(n) operator will accept at most n elements before it cancels upstream, so if you want it to continuously process elements from the topic you should use take. Additionally Sink.seq will collect all elements into memory so that would be a pretty bad idea if the stream is “infinite”.

The collect finite number of elements logic in the sample is likely to keep the sample as simple as possible.

Another basic example that will keep running “forever” (until you terminate the ActorSystem backing it), and print each element, would be:

Source(Nil).runWith(Sink.foreach(elem => println(elem)))

Hi Johan,

Thank you for your reply.

I was unable to find take in the API (I am using akka-stream 2.5.16). The second suggestion worked though!

Would this be expensive - to keep the stream open forever?

I was referring to the sample code there, that uses the take operator in amqpSource.take(5).runWith(Sink.seq), it is avalabile on both Source and Flow

In general I’d recommend to read up on the basic concepts, Akka Streams can be hard to trial-and-error-learn. It will likely save you time and grief. The quickstart in the docs for example: (there are also two more learning paths in the docs )

Running a stream “forever” is one of the most common use cases for using Akka Streams, so this is a fine idea.

Thanks, Johan, I will go through the documentation.