[AMQP Connector] - problem with passing parameter to the QueueDeclaration using the arguments Map

Hello,

I am using RabbitMQ 3.4.4 and akka-stream-alpakka-amqp" in version “0.18”.
I am experimenting and exception when using the AMQP connector.
Here is the code I use to test the connector:

import akka.stream._
import akka.stream.scaladsl._
import akka.{ NotUsed, Done }
import akka.actor.ActorSystem
import akka.util.ByteString
import scala.concurrent._
import scala.concurrent.duration._
import java.nio.file.Paths
import akka.stream.alpakka.amqp._
import akka.stream.alpakka.amqp.scaladsl._
import scala.util._

implicit val system = ActorSystem("QuickStart")
implicit val materializer = ActorMaterializer()
import scala.concurrent.ExecutionContext.Implicits.global

val sourceInt = Source(1 to 100).map(s => ByteString(s))
val connProvider = AmqpUriConnectionProvider("amqp://pipeline:abc123g@my-host:5672")
val queueName = "pipe.parser.input"
val queueDeclaration = QueueDeclaration(queueName, 
                                        durable = true,
                                        arguments = Map("x-max-length" -> "100000"))

val amqpSink = AmqpSink.simple(
  AmqpSinkSettings(connProvider)
    .withRoutingKey(queueName)
    .withDeclarations(queueDeclaration))

val fg = sourceInt.runWith(amqpSink)
fg.onComplete {
    case Failure(e) => e.printStackTrace()
    case Success(r) => println(r)
}

And I receive this exception:

scala> java.io.IOException
        at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:126)
        at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:122)
        at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:144)
        at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:947)
        at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.queueDeclare(AutorecoveringChannel.java:333)
        at akka.stream.alpakka.amqp.AmqpConnectorLogic.$anonfun$preStart$2(AmqpConnector.scala:47)
        at scala.collection.immutable.List.foreach(List.scala:389)
        at akka.stream.alpakka.amqp.AmqpConnectorLogic.preStart(AmqpConnector.scala:40)
        at akka.stream.alpakka.amqp.AmqpConnectorLogic.preStart$(AmqpConnector.scala:24)
        at akka.stream.alpakka.amqp.AmqpSinkStage$$anon$1.preStart(AmqpSinkStage.scala:58)
        at akka.stream.impl.fusing.GraphInterpreter.init(GraphInterpreter.scala:295)
        at akka.stream.impl.fusing.GraphInterpreterShell.init(ActorGraphInterpreter.scala:554)
        at akka.stream.impl.fusing.ActorGraphInterpreter.tryInit(ActorGraphInterpreter.scala:676)
        at akka.stream.impl.fusing.ActorGraphInterpreter.preStart(ActorGraphInterpreter.scala:724)
        at akka.actor.Actor.aroundPreStart(Actor.scala:528)
        at akka.actor.Actor.aroundPreStart$(Actor.scala:528)
        at akka.stream.impl.fusing.ActorGraphInterpreter.aroundPreStart(ActorGraphInterpreter.scala:667)
        at akka.actor.ActorCell.create(ActorCell.scala:654)
        at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:525)
        at akka.actor.ActorCell.systemInvoke(ActorCell.scala:547)
        at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282)
        at akka.dispatch.Mailbox.run(Mailbox.scala:223)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'x-max-length' for queue 'pipe.parser.input' in vhost '/': received '100000' but current is '100000', class-id=50, method-id=10)
        at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66)
        at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36)
        at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:494)
        at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:288)
        at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:138)
        ... 22 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'x-max-length' for queue 'pipe.parser.input' in vhost '/': received '100000' but current is '100000', class-id=50, method-id=10)
        at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:504)
        at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:346)
        at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:178)
        at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:111)
        at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:643)
        at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:47)
        at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:581)
        ... 1 more

As you can see in the exception I am giving the exact same value as the configuration is waiting for.
When not giving the parameter of the configuration x-max-length it complains it needs it as a long.
The problem is that the parameter map of the QueueDeclaration is Map[String,AnyRef] wich makes it difficult to give it that long it request.

Thanks for helping if there is any solution (outside recreating the queue without the x-max-length parameter wich I check would work but is not accecptable in my context).

Note: this was originally an issue (https://github.com/akka/alpakka/issues/894) but I have been advised to put it as a question here.

Many thanks for helping.

The map is passed on to the underlying Java API which just supports Object as value type, here represented as AnyRef.

You’d need to pass it as a java.lang.Long. Eg. via Long.box(100000L).

Enno.

Thanks I will try that.

Samuel