Alpakka JMS connector restart behaviour

Hi,

After a restart of the JMS server my JMS connector ProcessingApp fails with:

ERROR | Error during postStop in [akka.stream.alpakka.jms.JmsProducerStage@39db03cd]: Cannot send, channel has already failed: tcp://127.0.0.1:8888
javax.jms.JMSException: Cannot send, channel has already failed: tcp://127.0.0.1:8888
    at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:72)
    at org.apache.activemq.ActiveMQConnection.doAsyncSendPacket(ActiveMQConnection.java:1310)
    at org.apache.activemq.ActiveMQConnection.asyncSendPacket(ActiveMQConnection.java:1302)
    at org.apache.activemq.ActiveMQSession.doClose(ActiveMQSession.java:666)
    at org.apache.activemq.ActiveMQSession.close(ActiveMQSession.java:657)
    at akka.stream.alpakka.jms.JmsSession.closeSession(JmsConnector.scala:99)

or sometimes with:

WARN | Restarting graph due to failure. stack_trace:
javax.jms.JMSException: java.io.EOFException
    at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:54)
    at org.apache.activemq.ActiveMQConnection.onAsyncException(ActiveMQConnection.java:1952)
    at org.apache.activemq.ActiveMQConnection.onException(ActiveMQConnection.java:1971)
    at org.apache.activemq.transport.TransportFilter.onException(TransportFilter.java:114)
    at org.apache.activemq.transport.ResponseCorrelator.onException(ResponseCorrelator.java:126)
    at org.apache.activemq.transport.TransportFilter.onException(TransportFilter.java:114)
    at org.apache.activemq.transport.TransportFilter.onException(TransportFilter.java:114)
    at org.apache.activemq.transport.WireFormatNegotiator.onException(WireFormatNegotiator.java:173)
    at org.apache.activemq.transport.AbstractInactivityMonitor.onException(AbstractInactivityMonitor.java:345)
    at org.apache.activemq.transport.TransportSupport.onException(TransportSupport.java:96)
    at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:219)
    at java.lang.Thread.run(Thread.java:745)

Despite having a Supervision.Decider and a RestartSource in place.

How to reproduce:
Start JMSServer
Start ProcessingApp
Restart JMSServer

I found a similar reported issue:


where the code fails at akka.stream.alpakka.jms.JmsSession.closeSession(JmsConnector.scala:99)
as well.

Any thoughts or workarounds?

Kind regards
Paul

The problem with stream collapsing on broker restart is due to source collapsing. Internally this will result in a JMSException which will fail the source. The decider cannot restart the source itself. You may want to look into wrapping the source in a RestartSource. Please see https://docs.akka.io/docs/akka/current/stream/stream-error.html?language=scala#delayed-restarts-with-backoff-operator .

Hi Akara,

Thanks for your reply. I actually do use a RestartSource in my reproducer class ProcessingApp

  //This does not have the desired effect
  val jmsConsumerSourceRestartable: Source[Message, NotUsed] = RestartSource.withBackoff(
    minBackoff = 3.seconds,
    maxBackoff = 30.seconds,
    randomFactor = 0.2
  ) { () => jmsConsumerSource
  }

  val jmsConsumerSource: Source[Message, KillSwitch] = JmsConsumer(
    JmsConsumerSettings(connectionFactory)
      .withTopic("test-topic")
      .withBufferSize(10)
      .withAcknowledgeMode(AcknowledgeMode.ClientAcknowledge)
  )

Hmm, I am a bit lost on this one.

Thanks
Paul

I found a workaround, which works for Apache ActiveMQ.

The brokerURL in the client can be configured like this:

failover:tcp://127.0.0.1:8888

where the “failover:” part instructs the ActiveMQ client to do a reconnect in case of network failure and if I now restart the JMSServer the log looks like this:

WARN | Transport (tcp://127.0.0.1:8888) failed , attempting to automatically reconnect: {}
java.io.EOFException
   at java.io.DataInputStream.readInt(DataInputStream.java:392)
   at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:268)
   at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:240)
   at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:232)
   at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:215)
   at java.lang.Thread.run(Thread.java:745)
INFO | Successfully reconnected to tcp://127.0.0.1:8888

I have updated my reproducer accordingly.

Links:


http://activemq.apache.org/how-can-i-support-auto-reconnection.html

Regards
Paul