Alpakka MQTT Streaming: dead letters

So I’m just trying to follow the MQTT Streaming documentation ( https://doc.akka.io/docs/alpakka/current/mqtt-streaming.html ) with akka-stream-alpakka-mqtt-streaming version 1.1.0 to create a connection to the public MQTT broker iot.eclipse.org.

object Test2 extends App {
  implicit val system = ActorSystem("test2")
  implicit val materializer = ActorMaterializer()
  implicit val executionContext = system.dispatcher

  val topic = "/test2/"
  val clientId = UUID.randomUUID().toString
  val settings = MqttSessionSettings()
  val session = ActorMqttClientSession(settings)

  val connection = Tcp().outgoingConnection("iot.eclipse.org", 1883)

  val mqttFlow: Flow[Command[Nothing], Either[MqttCodec.DecodeError, Event[Nothing]], NotUsed] =
    Mqtt
      .clientSessionFlow(session, ByteString("1"))
      .join(connection)


  val (commands: SourceQueueWithComplete[Command[Nothing]], events: Future[Publish]) =
    Source
      .queue(2, OverflowStrategy.fail)
      .via(mqttFlow)
      .collect {
        case Right(Event(p: Publish, _)) => p
      }
      .toMat(Sink.head)(Keep.both)
      .run()

  commands.offer(Command(Connect(clientId, ConnectFlags.CleanSession)))
  commands.offer(Command(Subscribe(topic)))
  session ! Command(
    Publish(ControlPacketFlags.RETAIN | ControlPacketFlags.QoSAtLeastOnceDelivery, topic, ByteString("ohi"))
  )

  // for shutting down properly
  commands.complete()
  commands.watchCompletion().foreach(_ => session.shutdown())

}

I’ve enabled log level “DEBUG” and got the following output:

[DEBUG] [07/25/2019 15:35:30.440] [main] [EventStream] StandardOutLogger started
[DEBUG] [07/25/2019 15:35:30.834] [main] [EventStream(akka://test2)] logger log1-Logging$DefaultLogger started
[DEBUG] [07/25/2019 15:35:30.834] [main] [EventStream(akka://test2)] logger log1-Logging$DefaultLogger started
[DEBUG] [07/25/2019 15:35:30.838] [main] [EventStream(akka://test2)] Default Loggers started
[DEBUG] [07/25/2019 15:35:30.838] [main] [EventStream(akka://test2)] Default Loggers started
[DEBUG] [07/25/2019 15:35:31.359] [test2-akka.actor.default-dispatcher-4] [akka://test2/system/IO-TCP/selectors/$a/0] Resolving iot.eclipse.org before connecting
[DEBUG] [07/25/2019 15:35:31.385] [test2-akka.actor.default-dispatcher-2] [akka://test2/system/IO-DNS] Resolution request for iot.eclipse.org from Actor[akka://test2/system/IO-TCP/selectors/$a/0#-1318290068]
[DEBUG] [07/25/2019 15:35:31.403] [test2-akka.actor.default-dispatcher-2] [akka://test2/system/IO-TCP/selectors/$a/0] Attempting connection to [iot.eclipse.org/198.41.30.241:1883]
[DEBUG] [07/25/2019 15:35:31.678] [test2-akka.actor.default-dispatcher-2] [akka://test2/system/IO-TCP/selectors/$a/0] Connection established to [iot.eclipse.org:1883]
[DEBUG] [07/25/2019 15:35:31.755] [test2-akka.actor.default-dispatcher-3] [akka://test2/user/client-connector-0] Start timer [receive-connack] with generation [1]
[DEBUG] [07/25/2019 15:35:31.757] [test2-akka.actor.default-dispatcher-3] [akka://test2/user/client-connector-0] Cancel all timers
[INFO] [07/25/2019 15:35:31.760] [test2-akka.actor.default-dispatcher-4] [akka://test2/user/client-connector-0] Message [akka.stream.alpakka.mqtt.streaming.impl.ClientConnector$SubscribeReceivedLocally] without sender to Actor[akka://test2/user/client-connector-0#-439865709] was not delivered. [1] dead letters encountered. If this is not an expected behavior, then [Actor[akka://test2/user/client-connector-0#-439865709]] may have terminated unexpectedly, This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [07/25/2019 15:35:31.760] [test2-akka.actor.default-dispatcher-4] [akka://test2/user/client-connector-0] Message [akka.stream.alpakka.mqtt.streaming.impl.ClientConnector$QueueOfferCompleted] without sender to Actor[akka://test2/user/client-connector-0#-439865709] was not delivered. [2] dead letters encountered. If this is not an expected behavior, then [Actor[akka://test2/user/client-connector-0#-439865709]] may have terminated unexpectedly, This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [07/25/2019 15:35:31.760] [test2-akka.actor.default-dispatcher-4] [akka://test2/user/client-connector-0] Message [akka.stream.alpakka.mqtt.streaming.impl.ClientConnector$ConnectionLost] without sender to Actor[akka://test2/user/client-connector-0#-439865709] was not delivered. [3] dead letters encountered. If this is not an expected behavior, then [Actor[akka://test2/user/client-connector-0#-439865709]] may have terminated unexpectedly, This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[DEBUG] [07/25/2019 15:35:31.765] [test2-akka.actor.default-dispatcher-4] [akka.stream.Log(akka://test2/system/StreamSupervisor-0)] [client-events] Upstream failed, cause: WatchedActorTerminatedException: Actor watched by [Watch] has terminated! Was: Actor[akka://test2/user/client-connector-0#-439865709]
[DEBUG] [07/25/2019 15:35:31.766] [test2-akka.actor.default-dispatcher-4] [akka.stream.Log(akka://test2/system/StreamSupervisor-0)] [client-commandFlow] Downstream finished.

what aim I missing?

Solved the issue by adding a Thread.sleep(10000) before commands.complete().