Alpakka amqp Synchronous rpc


(Nagabhushan) #1

Hi,

I am trying to implement a synchronous rpc, following the sample in this page:


I am using Java , RabbitMQ and alpakka version akka-stream-alpakka-amqp_2.12.
I am facing some problems:

  1. Firstly, in the method publishAndConsumeRpc(), the api OutgoingMessage.create() doesnt exist. So I used this code, but this also doesnt work. The assert(expectNextUnordered) fails with my code. So am I reading response from the wrong replyTo queue?
        String res = null;
        // #run-rpc-flow
        try {
            res = result.first().toCompletableFuture().get(3, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }

        final String routingKey = res;

        Sink<OutgoingMessage, CompletionStage<Done>> amqpSink =
                AmqpSink.createReplyTo(AmqpReplyToSinkSettings.create(connectionProvider));

        amqpSource
                .map(
                        b ->
                                OutgoingMessage.apply(b.bytes().concat(ByteString.fromString("a")),
                                        false, false, Option.apply(b.properties()), Option.apply(routingKey)))
                .runWith(amqpSink, materializer);
  1. Secondly, instead of using TestSink can I use a regular sink?. Basically I want to read the message returned in the “replyto” queue. But I am unable to find the right api to read the response.

I would really appreciate any help.


(Arnout Engelen) #2

[quote=“Nagabhushan, post:1, topic:2312, full:true”]alpakka version akka-stream-alpakka-amqp_2.12. I am facing some problems:

  1. Firstly, in the method publishAndConsumeRpc(), the api OutgoingMessage.create() doesnt exist.[/quote]

akka-stream-alpakka-amqp_2.12 is the name of the artifact, what version are you using?


(Nagabhushan) #3

Hello,

Thanks for the quick response. You are right, I was using an old version 0.20. Now i changed it to 1.0-M1 and my code is working fine. I am able to get the response in replyto queue as well.

Thank you very much!