Alpakka Mqtt streaming continuos failure

I am having this error show up occasionally using Alpakka MQTT streaming:

akka.stream.alpakka.mqtt.streaming.impl.Subscriber$SubscribeFailed$: null

My setup is:

  • HiveMQ 4.1.1
  • Alpakka-mqtt-streaming_2.12_1.1.0
  • Scala 2.12.8

This is the stack trace:

2019-07-18 08:00:38,343 ERROR a.a.t.i.a.ActorAdapter [default-akka.actor.default-dispatcher-17] null
akka.stream.alpakka.mqtt.streaming.impl.Subscriber$SubscribeFailed$: null
2019-07-18 08:00:38,343 ERROR a.a.t.i.a.ActorAdapter [default-akka.actor.default-dispatcher-17] null
akka.stream.alpakka.mqtt.streaming.impl.Subscriber$SubscribeFailed$: null
2019-07-18 08:00:38,345 DEBUG a.s.a.m.s.i.Subscriber$ [default-akka.actor.default-dispatcher-2] Cancel all timers
2019-07-18 08:00:38,345 DEBUG a.s.a.m.s.i.Subscriber$ [default-akka.actor.default-dispatcher-2] Cancel all timers
2019-07-18 08:00:38,357 DEBUG a.s.Materializer [default-akka.actor.default-dispatcher-16] [client-commandFlow] Upstream failed, cause: ActorMqttClientSession$SubscribeFailed$: null
2019-07-18 08:00:38,357 DEBUG a.s.Materializer [default-akka.actor.default-dispatcher-16] [client-commandFlow] Upstream failed, cause: ActorMqttClientSession$SubscribeFailed$: null
2019-07-18 08:00:38,358 DEBUG a.a.RepointableActorRef [default-akka.actor.default-dispatcher-14] Aborting tcp connection to [redacted] because of upstream failure: akka.stream.alpakka.mqtt.streaming.scaladsl.ActorMqttClientSession$SubscribeFailed$
2019-07-18 08:00:38,358 DEBUG a.a.RepointableActorRef [default-akka.actor.default-dispatcher-14] Aborting tcp connection to [redacted] because of upstream failure: akka.stream.alpakka.mqtt.streaming.scaladsl.ActorMqttClientSession$SubscribeFailed$
2019-07-18 08:00:38,359 DEBUG a.s.Materializer [default-akka.actor.default-dispatcher-4] [client-events] Upstream failed, cause: ActorMqttClientSession$SubscribeFailed$: null
2019-07-18 08:00:38,359 DEBUG a.s.Materializer [default-akka.actor.default-dispatcher-4] [client-events] Upstream failed, cause: ActorMqttClientSession$SubscribeFailed$: null

Hi @idarlington,

thank you for posting this. Looks like a bug in the Alpakka Mqtt Streaming connector. Could you share some context and code of how you are using the connector?

@2m I created the flow with the connection as specified in the documentation. Then created a Source this way:

val (commands, source)  = Source
      .queue(
        mqttBrokerConfig.connectorBuffer,
        OverflowStrategy.backpressure
      )
      .via(mqttFlow)
      .collect {
        case Right(Event(publishEvent: Publish, _)) => publishEvent
      }
      .preMaterialize()

I checked the hivemq logs when it happened and this is what I found:

broker_1               | 2019-07-18 14:57:26,859 DEBUG - Flushing Flusher@7b3c4d7c[queueSize=1,aggregateSize=0,terminated=null]
broker_1               | 2019-07-18 14:57:26,859 DEBUG - Flusher@7b3c4d7c[queueSize=0,aggregateSize=0,terminated=null] processing 1 entries: [FrameEntry[TEXT[len=2415,fin=true,rsv=1..,masked=false],org.eclipse.jetty.websocket.common.extensions.compress.CompressExtension$Flusher@58044887[PROCESSING],OFF,null]]
broker_1               | 2019-07-18 14:57:26,859 DEBUG - Flusher@7b3c4d7c[queueSize=0,aggregateSize=0,terminated=null] flushing 1 frames: [FrameEntry[TEXT[len=2415,fin=true,rsv=1..,masked=false],org.eclipse.jetty.websocket.common.extensions.compress.CompressExtension$Flusher@58044887[PROCESSING],OFF,null]]
broker_1               | 2019-07-18 14:57:26,859 DEBUG - write: WriteFlusher@1893f5e1{IDLE}->null [DirectByteBuffer@137b706c[p=0,l=4,c=1024,r=4]={<<<\xC1~\to>>>\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00...\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00},HeapByteBuffer@4f766ff0[p=0,l=2415,c=2419,r=2415]={<<<\xEc\x9c]\x8f\xDbD\x14\x86\xFf\x8a\x15\t\t\xA4\xA8x<...\x0b\x91q\x7f\xDc\xAd\xBb_\xA8\xA1\xDa}\xD7\xFe\x04>>>\x00\x00\xFf\xFf}]
broker_1               | 2019-07-18 14:57:26,859 DEBUG - update WriteFlusher@1893f5e1{WRITING}->null:IDLE-->WRITING
broker_1               | 2019-07-18 14:57:26,860 DEBUG - flushed 2419 SocketChannelEndPoint@45d16ef7{/172.23.0.1:58168<->/172.23.0.2:8080,OPEN,fill=FI,flush=W,to=5034/300000}{io=1/1,kio=1,kro=1}->WebSocketServerConnection@82e4b9a8[ios=IOState@dbe784c[OPEN,in,out],f=Flusher@7b3c4d7c[queueSize=0,aggregateSize=0,terminated=null],g=Generator[SERVER,validating,+rsv1],p=Parser@17a19726[ExtensionStack,s=START,c=0,len=0,f=null]]
broker_1               | 2019-07-18 14:57:26,861 DEBUG - Flushed=true written=2419 remaining=0 WriteFlusher@1893f5e1{WRITING}->null
broker_1               | 2019-07-18 14:57:26,861 DEBUG - update WriteFlusher@1893f5e1{IDLE}->null:WRITING-->IDLE
broker_1               | 2019-07-18 14:57:26,861 DEBUG - Flushing Flusher@7b3c4d7c[queueSize=0,aggregateSize=0,terminated=null]
broker_1               | 2019-07-18 14:57:26,862 DEBUG - Flusher@7b3c4d7c[queueSize=0,aggregateSize=0,terminated=null] processing 0 entries: []
broker_1               | 2019-07-18 14:57:26,862 DEBUG - Processing null

Nothing too suspicious. Would you mind creating a ticket in the Alpakka issue tracker?

@2m done https://github.com/akka/alpakka/issues/1833

Thanks.