Alpakka JMS connection caching

Hi, I have a question about jms connection caching in Alpakka JMS. I use Alpakka JMS in combination with Oracle AQ for forwarding a request received by a akka http endpoint to a AQ queue. The code is basically as follows:

val jmsSink = {
  val dataSource: javax.sql.DataSource = new OracleDataSource() // with config of course
  val jmsConnectionFactory =  AQjmsFactory.getConnectionFactory(dataSource)
  JmsProducer.sink(JmsProducerSettings(system, jmsConnectionFactory).withQueue("myQueue"))
}

  pathPrefix("myendpoint") {
    post {
      entity(as[MessageEnvelope]) { envelope =>
        onSuccess(send(envelope))
      }
    }
  }

  def send(envelope: MessageEnvelope) = {
    Source.single(JmsByteMessage(envelope.toXML.getBytes())).runWith(jmsSink).map(_ => "OK")
  }

The problem is that for each request Alpakka creates a new JMS connection (through the JmsConnectionFactory in akka.stream.alpakka.jms.impl.JmsConnector) which eventually causes the oracle db to run out of resources since every new JMS connection translates to a new database connection in AQ.

I have two questions:

  1. How can I prevent connection leakage as described above? … or in other words who can I configure a sink that re-uses an existing JMS connection? (For what I’ve seen so far in the source code I do not expect that, but why not ask…).
  2. Are there plans to introduce a CachingConnectionFactory similar to what the Spring framework offers? (See: https://docs.spring.io/spring/docs/current/javadoc-api/org/springframework/jms/connection/CachingConnectionFactory.html)

Thank you for your reply in advance!

Hi @upeter,

JMS is a tricky beast when it comes to session and connection handling.

With the code in send above, you establish a new stream and thus a new session and connection for every message.
Instead you should create one stream which keeps running and funnel your messages into it. If the rest of you application isn’t built in a streaming fashion yet, you could use Akka’s Source.queue and use its offer method to put the messages into the stream.

Depending on where the messages in your application pop up, you might want to make want to use Akka streams the whole way to get all benefits of Akka Streams.

Cheers,
Enno.

Hi Enno,

Thanks for your reply. I’m using Akka http as REST layer. Akka http is basically Stream based, but only per request (I changed my initial sample to clarify that). Since I’m forwarding http requests from different clients to a single JMS AQ Queue I cannot bind the jmsSink to the Akka http request stream directly.

Your suggestion to use Source.queue however seems promising. It would then look as follows:

  val jmsSink = JmsProducer.sink(JmsProducerSettings(system, jmsConnectionFactory).withQueue(jmsConfig.queue))

  val queue = Source
    .queue[MessageEnvelope](100, OverflowStrategy.backpressure)
    .map(envelope => JmsByteMessage(envelope.toXML.getBytes()))
    .toMat(jmsSink)(Keep.left)
    .run()

  pathPrefix("myendpoint") {
    post {
      entity(as[MessageEnvelope]) { envelope =>
        onSuccess(send(envelope))
      }
    }
  }

 def send(envelope: MessageEnvelope):Future[String] = {
     queue.offer(envelope).map(_ => "OK")   //simplified, correct handling of QueueOfferResult would be needed
  }

By doing so, the JMS connection would indeed be reused, also multiple JMS sessions could be used to increase performance.

I’ll give it a try. Thanks once more!

Urs