Akka http to alpakka amqp (noob question)

I am using Rabbitmq and akka. I am currently using akka http, alpakka amqp, and akka streams. I have managed to get akka http and alpakka amqp running separately but I want to combine the two. I am completely stuck. I have akka http obtaining all the name of the queues in my rabbitmq api json and then I want alpakka amqp to connect to the queue names obtained. What I am confused about is I have akka http as a Source and the AmqpSource is also a Source. So how to get the obtained string value from the akka http source and then pass it to the parameter of AmqpSource and then make it run. I know I am just mapping but how to make AmqpSource run in the graph. I tried .run() and .runWith() which is probably totally incorrect.

First, I just want to know how to go about connecting to one queue from the data of akka http.
I tried

def connect (queue: String) = {
AmqpSource.atMostOnceSource(NamedQueueSourceSettings(amqpUri, queue),10)

val httptoamqp = obtainallqueuenames .map( _.apply(1).toString()).map( _=> connect(_))

I also did

obtainallqueuenames.map( _.map( NamedQueueSourceSettings(amqpUri, ))).map(.map(AmqpSource.atMostOnceSource(_ , bufferSize = 10))).map(_.apply(1))

httptoamqp ~> sink

I just want to make a connection to rabbitmq. I'm not sure how to go about doing this.


I would like to know how to connect to multiple queues?

Thank you,


welome to the forums. :) Did you check if you actually run the Sources as mentioned in here.

You mean this?
value run is not a member of akka.stream.scaladsl.Source[String => akka.stream.scaladsl.Source[akka.stream.alpakka.amqp.IncomingMessage,akka.NotUsed],scala.concurrent.Future[akka.http.scaladsl.Http.OutgoingConnection]] [error] .map(_.apply(1).toString()).map(_ => connect(_)).run()

The other places I put run() I get “symbol is unresolved”

Ok, I just ended up just creating another runnablegraph in the method. thought there would be a more easier way. ill ask if I have any further questions.

thanks for the help