Alpakka ActiveMQ request/response with pooled temporary queues

(Mrbald) #1

I am thinking of migrating some JMS-based legacy to Alpakka. One of widely used patterns in the code is request/response with temporary queue (JMSReplyTo). Is it possible with Alpakka out of the box? If not, what is the best direction to take in the implementation, in order to leverage as much as possible of Alpakka ActiveMQ glue?

(Enno) #2

I’m not familiar with the use of temporary JMS queues for request/reply. Can you explain it in more detail, please?

Cheers,
Enno.

(Mrbald) #3

Temp queue is “attached” to the connection that created it, it dies automatically when the connection dies, which makes it a perfect candidate for “client inbox” (client creates one temp queue on startup and requests services to reply into that queue).
But it also makes it a bit not inline with the API of the Alpakka, as the latter takes a connection factory
as an argument and manages connections/sessions/etc. internally without exposing them at a message creation point and also it has connection-per-stream and one cannot refer to a temp queue created in one connection (owned by the request sender JmsConsumer) inside another one (owned by response receiver JmsProducer).

The whole workflow would look like this:

Client:

val connection = factory.createConnection(...)
val session = connection.createSession(...)
val inbox = session.createTemporaryQueue(...) // this dies when client dies and has unique name
val producer = session.createProducer(...) // request producer
val consumer = session.createConsumer(inbox) // response consumer

...
val request = session.createBytesMessage()
request.setJmsReplyTo(inbox)
request.setJmsCorrelationId(...)
producer.send(request) // send a request
...
val response = consumer.receive(...) // receive a response

Service:

val connection = factory.createConnection(...)
val session = connection.createSession(...)
val inbox = session.createQueue("me.service.request") // non-transient queue
val consumer = session.createConsumer(inbox) // request consumer 
val producer = session.createProducer() // response producer
...
val request = consumer.receive(...) // receive a request
...
val response = session.createBytesMessage()
response.setJmsReplyTo(request.getJmsReplyTo())
response.setJmsCorrelationId(request.getJmsCorrelationId())
producer.send(response) // send a response

What would help is if Alpakka JMS had a specialized bidiflow for this kind of request/response, so both inbound and outgoing channels would sit on top of the same JMS Connection.

(Enno) #4

I’m not sure it is possible with the current APIs.

Have you noticed the CustomDestination API which may create the temporary queue?