Alpakka MQTT Streaming: Client does not complain when there is no tcp connection

In the MQTT Streaming client, when there is no tcp connection: Why does the connection not throw an exception?
https://doc.akka.io/docs/alpakka/current/mqtt-streaming.html#flow-through-a-client-session

eg this sends data into a void:

val connection = Tcp().outgoingConnection("127.0.0.1", 9999)
val mqttFlow =
  Mqtt
   .clientSessionFlow(clientSession, ByteString(connectionId))
   .join(connection)

Thanks
Paul

Creating this Flow by itself does nothing: it just creates a ‘template’ of a stream component.

The section you quote also continues to explain how to use this template. I would expect, in that case, the events future to complete with a failure containing the connection exception. Does that not happen for you?

Hi Arnout,

Thx for your reply.
Sorry, my question should have been more verbose.
My implementation is slightly different than the example code and it does not throw an exception, when there is no tcp connection.

This is a reproducer:

Running it (without a MQTT server started) yields this log:

Client: pub-1 starting...
Client: pub-1 bound to: 127.0.0.1:1883
Client: pub-1 send payload: 1
Client: sub-2 starting...
Client: sub-1 starting...
Client: sub-2 bound to: 127.0.0.1:1883
Client: sub-1 bound to: 127.0.0.1:1883
Client: pub-1 send payload: 2
Client: pub-1 send payload: 3
…

I noticed that in the akka streams Tcp examples the connection is wired in with the “via" operator (vs the “join” operator here) and these examples correctly complain, eg with:

sample.stream.TcpEcho$    | Client: 1 closed: Failure(akka.stream.StreamTcpException: Tcp command [Connect(127.0.0.1:6000,None,List(),None,true)] failed because of java.net.ConnectException: Connection refused)

Could that be the reason?

Kind regards
Paul

That code would indeed not throw an exception. The reason is the connection is made when the stream is executed with .run(), but .run() is asynchronously: it returns immediately before waiting to see whether the connection could be established (since that could block or take a while).

You are using Sink.ignore. Sink.ignore will produce a Future[Done] value on each materialization (so on each .run()), and that future should be completed with a failure when the stream fails.

You can get a hold of that future by using Keep.both rather than only keeping the queue with Keep.left, so:

val (commands, done) = {
      Source
        .queue(10, OverflowStrategy.backpressure,10)
        .via(mqttFlow)
        //Only the Publish events are interesting
        .collect { case Right(Event(p: Publish, _)) => p }
        .wireTap(event => logger.info(s"Client: $connectionId received payload: ${event.payload.utf8String}"))
        .toMat(Sink.ignore)(Keep.both)
        .run()
    }
    logger.info(s"Client: $connectionId bound to: $host:$port")

You can now use done to report an error when the stream fails.

Thanks for your explanation, I see your point.

I use Keep.both now and then get the expected: java.net.ConnectException
for the case “no tcp connection on startup”.

However, for the case “tcp connection goes down while the client is running” with this setup the client does not restart. I simulate “tcp connection goes down” by restarting the docker container with the MQTT broker.

    val (commands, done) = {
      Source
        .queue(10, OverflowStrategy.backpressure,10)
        .via(restartFlow)
        //Only the Publish events are interesting
        .collect { case Right(Event(p: Publish, _)) => p }
        .wireTap(event => logger.info(s"Client: $connectionId received payload: ${event.payload.utf8String}"))
        .toMat(Sink.ignore)(Keep.both)
        .run()
    }

    done.onComplete{
      case Success(value) => logger.info(s"Flow stopped with: $value. Probably lost tcp connection")
      case Failure(exception) => logger.error("Error no tcp connection on startup:", exception)
    }

    //WIP: Due to the async nature of the flow above, we don't know if we are really connected
    logger.info(s"Client: $connectionId bound to: $host:$port")
    MqttClient(session = clientSession, commands = commands)

I also tried using a RestartSource, but this did not compile, yielding the commands to NotUsed despite Keep.both or some mapMaterializedValue magic

I see the ConnAck / SubAck / PubAck events sent from the server. Reacting to those and thus responding on the MQTT protocol level would be an option.

What is the recommended way for the case “tcp connection goes down while the client is running”?

Thanks
Paul

Complete example: https://github.com/pbernet/akka_streams_tutorial/blob/master/src/main/scala/alpakka/mqtt/MqttEcho.scala

My query is related to this topic and thought of throwing it here. I was following document for writing a client subscriber.

After the following, I’m not sure how could I retrieve/interact with subscribed messages. Any lead?
Pair<SourceQueueWithComplete<Command>, CompletionStage> run =
Source.<Command>queue(3, OverflowStrategy.fail())
.via(mqttFlow)
.collect(
new JavaPartialFunction<DecodeErrorOrEvent, Publish>() {
@Override
public Publish apply(DecodeErrorOrEvent x, boolean isCheck) {
if (x.getEvent().isPresent() && x.getEvent().get().event() instanceof Publish)
return (Publish) x.getEvent().get().event();
else throw noMatch();
}
})
.toMat(Sink.head(), Keep.both())
.run(system);

SourceQueueWithComplete<Command> commands = run.first();
commands.offer(new Command<>(new Connect(clientId, ConnectFlags.CleanSession())));
commands.offer(new Command<>(new Subscribe(topic)));
session.tell(
new Command<>(
new Publish(
ControlPacketFlags.RETAIN() | ControlPacketFlags.QoSAtLeastOnceDelivery(),
topic,
ByteString.fromString(“ohi”))));

// for shutting down properly
commands.complete();
commands.watchCompletion().thenAccept(done → session.shutdown());

Also, in the following example, it shows how to subscribe to the client but nothing about how to get messages

Any lead/pointers?
Thanks,
Ahad