JmsConsumer does not close connection on stream restart

Greetings all,

I came across some struggle using akka-stream-alpakka-jms_2.13-5.0.0.

The problem

given a ConnectionFactory set with a clientId (due to being a durable topic subscription solution)

var connectionFactory = new ActiveMQConnectionFactory()
connectionFactory.setBrokerURL("vm://localhost:8000")
connectionFactory.setClientID("some-client-id")

and a JmsConsumerSettings

  JmsConsumerSettings(system, connectionFactory)
    .withDurableTopic("test-topic", "some-subscriber")
    .withSessionCount(1)
    .withAckTimeout(3.seconds)
    .withFailStreamOnAckTimeout(true)

when ack timeout is reached, the exception is thrown and the stream begins to restart

  [WARN] [FxTradeBridgeTest-akka.actor.default-dispatcher-26] [RestartWithBackoffSource(akka://FxTradeBridgeTest)] Restarting stream due to failure [1]: akka.stream.alpakka.jms.JmsTxAckTimeout: The TxEnvelope didn't get committed or rolled back within ack-timeout (3 seconds)

when RestartSource kicks off and a new connection is attempted, it throws InvalidClientIDException, for it cannot have multiple connections with the same clientId

 javax.jms.InvalidClientIDException: Broker: localhost - Client: some-client-id already connected from vm://localhost#4

Analysis

on stream failure

when JmsConnector>finishStop>closeSessions() gets called

PS: if at this moment I invoke connection.close() via evaluator, the stream is restarted gracefully and a new connection is successfully created!

Could you please advise / share a workaround?

Thanks a bunch!