I came across some struggle using akka-stream-alpakka-jms_2.13-5.0.0.
given a ConnectionFactory set with a clientId (due to being a durable topic subscription solution)
var connectionFactory = new ActiveMQConnectionFactory() connectionFactory.setBrokerURL("vm://localhost:8000") connectionFactory.setClientID("some-client-id")
and a JmsConsumerSettings
JmsConsumerSettings(system, connectionFactory) .withDurableTopic("test-topic", "some-subscriber") .withSessionCount(1) .withAckTimeout(3.seconds) .withFailStreamOnAckTimeout(true)
when ack timeout is reached, the exception is thrown and the stream begins to restart
[WARN] [FxTradeBridgeTest-akka.actor.default-dispatcher-26] [RestartWithBackoffSource(akka://FxTradeBridgeTest)] Restarting stream due to failure : akka.stream.alpakka.jms.JmsTxAckTimeout: The TxEnvelope didn't get committed or rolled back within ack-timeout (3 seconds)
when RestartSource kicks off and a new connection is attempted, it throws InvalidClientIDException, for it cannot have multiple connections with the same clientId
javax.jms.InvalidClientIDException: Broker: localhost - Client: some-client-id already connected from vm://localhost#4
on stream failure
when JmsConnector>finishStop>closeSessions() gets called
- the session is closed
- the connection remains open
PS: if at this moment I invoke connection.close() via evaluator, the stream is restarted gracefully and a new connection is successfully created!
Could you please advise / share a workaround?
Thanks a bunch!