JmsConsumer does not close connection on stream restart

Greetings all,

I came across some struggle using akka-stream-alpakka-jms_2.13-5.0.0.

The problem

given a ConnectionFactory set with a clientId (due to being a durable topic subscription solution)

var connectionFactory = new ActiveMQConnectionFactory()

and a JmsConsumerSettings

  JmsConsumerSettings(system, connectionFactory)
    .withDurableTopic("test-topic", "some-subscriber")

when ack timeout is reached, the exception is thrown and the stream begins to restart

  [WARN] [] [RestartWithBackoffSource(akka://FxTradeBridgeTest)] Restarting stream due to failure [1]: The TxEnvelope didn't get committed or rolled back within ack-timeout (3 seconds)

when RestartSource kicks off and a new connection is attempted, it throws InvalidClientIDException, for it cannot have multiple connections with the same clientId

 javax.jms.InvalidClientIDException: Broker: localhost - Client: some-client-id already connected from vm://localhost#4


on stream failure

when JmsConnector>finishStop>closeSessions() gets called

PS: if at this moment I invoke connection.close() via evaluator, the stream is restarted gracefully and a new connection is successfully created!

Could you please advise / share a workaround?

Thanks a bunch!

Hi Bruno,

Thank you for reporting.
Your analysis seems correct, the connection does not get shut down (in time) before the new connection attempt is made.

In the JmsTxSourceStage.onSessionOpened the timeout triggers a rollback() on the JMS session, but it possibly should turn down the connection as the stage is about to fail when FailStreamOnAckTimeout is enabled.

I can’t think of a workaround. It would be great to show this problem in an integration test.