Alpakka ActiveMQ request/response with pooled temporary queues

I am thinking of migrating some JMS-based legacy to Alpakka. One of widely used patterns in the code is request/response with temporary queue (JMSReplyTo). Is it possible with Alpakka out of the box? If not, what is the best direction to take in the implementation, in order to leverage as much as possible of Alpakka ActiveMQ glue?

I’m not familiar with the use of temporary JMS queues for request/reply. Can you explain it in more detail, please?

Cheers,
Enno.

Temp queue is “attached” to the connection that created it, it dies automatically when the connection dies, which makes it a perfect candidate for “client inbox” (client creates one temp queue on startup and requests services to reply into that queue).
But it also makes it a bit not inline with the API of the Alpakka, as the latter takes a connection factory
as an argument and manages connections/sessions/etc. internally without exposing them at a message creation point and also it has connection-per-stream and one cannot refer to a temp queue created in one connection (owned by the request sender JmsConsumer) inside another one (owned by response receiver JmsProducer).

The whole workflow would look like this:

Client:

val connection = factory.createConnection(...)
val session = connection.createSession(...)
val inbox = session.createTemporaryQueue(...) // this dies when client dies and has unique name
val producer = session.createProducer(...) // request producer
val consumer = session.createConsumer(inbox) // response consumer

...
val request = session.createBytesMessage()
request.setJmsReplyTo(inbox)
request.setJmsCorrelationId(...)
producer.send(request) // send a request
...
val response = consumer.receive(...) // receive a response

Service:

val connection = factory.createConnection(...)
val session = connection.createSession(...)
val inbox = session.createQueue("me.service.request") // non-transient queue
val consumer = session.createConsumer(inbox) // request consumer 
val producer = session.createProducer() // response producer
...
val request = consumer.receive(...) // receive a request
...
val response = session.createBytesMessage()
response.setJmsReplyTo(request.getJmsReplyTo())
response.setJmsCorrelationId(request.getJmsCorrelationId())
producer.send(response) // send a response

What would help is if Alpakka JMS had a specialized bidiflow for this kind of request/response, so both inbound and outgoing channels would sit on top of the same JMS Connection.

I’m not sure it is possible with the current APIs.

Have you noticed the CustomDestination API which may create the temporary queue?

Notice this bit:

one cannot refer to a temp queue created in one connection (owned by the request sender JmsConsumer ) inside another one (owned by response receiver JmsProducer ).

I just tried the CustomDestination myself because I thought that since it’s such a common thing to do with JMS (request / reply with TempQueues) that there would be some way to do it, but seems there isn’t. Pulling the jms.Destination from the message (with .getJMSReplyTo) and setting it as the destination of the response message doesn’t work (presumably since the producer is a different connection / session).

So, perhaps wrong tool for the job, but makes alpakka JMS a non-starter for me. Was hoping to get to a stream interface to make things easier to (hopefully) pull out JMS altogether at a later date.

Thank you for trying the CustomDestination idea.

JMS has a lot of these rules around it and Alpakka JMS has a hard time to make it fit with the streaming approach. It might be interesting to see if there is a way to make consumers and producers share sessions.

It would be great if you can add an issue in Alpakka and describe the requested feature and the current limitations.

Cheers,
Enno.

Sorry! I was very wrong. It actually works, and I can probably now stop the stream implementation I was working on for reactivemq. At least using ActiveMQ as the JMS implementation, the request/response with temp queues works fine across the different sessions. My derp was that I was mapping the JmsMessage to add the CustomDestination after the producer flow, not before it. I’m not sure about other JMS implementations, but it seems as though everything is good on my end.

Thanks

Great!

Would you care to share some example code? It would be great if we can add it to the Alpakka JMS docs.

Cheers,
Enno.

Not sure exactly what you’re looking for, so here’s the test code I did:

  val responses: Flow[jms.TextMessage, JmsTextMessage, NotUsed] = Flow[jms.TextMessage] map { tm =>
    val dest = CustomDestination("", _ => tm.getJMSReplyTo)
    val base = JmsTextMessage(tm.getText.reverse).to(dest)
    val corrIdOpt = Option(tm.getJMSCorrelationID).map(JmsCorrelationId(_))
    corrIdOpt.fold(base)(base.withHeader)
  }
  JmsConsumer(JmsConsumerSettings(system, connFact).withQueue("test").withBufferSize(0))
    .collect {
      case m: jms.TextMessage => m
    }.map { tm =>
      val dest = CustomDestination("", _ => tm.getJMSReplyTo)
      val base = JmsTextMessage(tm.getText.reverse).to(dest)
      val corrIdOpt = Option(tm.getJMSCorrelationID).map(JmsCorrelationId(_))
      corrIdOpt.fold(base)(base.withHeader)
    }.via(
        JmsProducer.flow[JmsTextMessage](JmsProducerSettings(system, connFact).withQueue("ignored"))
      )
      .to(
        Sink.ignore
      )
      .run()

On a slightly related note, are there any methods to convert a jms.Message to/from a akka.stream.alpakka.jms.JmsMessage while keeping JMS properties and headers intact? Something like https://github.com/CodeMettle/reactivemq/blob/41f5173e52e0e1d62ce4df4c2d8cf0ecd5d23bb9/src/main/scala/com/codemettle/reactivemq/model/AMQMessage.scala#L98 ?

1 Like

Thanks for sharing.

No, there is no such constructor, but would make a great addition for this use-case. Would you be able to suggest an implementation in a Pull Request?

:+1: I’ll take a stab

As I stated in the PR, this is a first pass. I can make changes as time permits

Edit: I’ve added some preliminary comments with some thoughts I have about various bits