Alpakka-MQTT not subscribing

Hi all!

I’ve got a question about alpakka streaming.

I’m trying to use alpakka-mqtt to subscribe to a mqtt topic, poll a lagom service, convert event and post to another mqtt topic. My problem is that alpakka is only connecting to the mqtt broker but not subscribing to the defined topic. I hope someone can drop me a hint what i’m doing wrong.

My Code is: (it’s a lagom service)

private FileRequestHandler(Config cfg, ActorSystem sys) {
        final Materializer materializer = ActorMaterializer.create(sys);
        final RecordSerializer recSer = new RecordSerializer();
        final ObjectMapper mapper = new ObjectMapper();
        mapper.registerModule(new Jdk8Module());
        mapper.registerModule(new ParameterNamesModule());

        MqttConnectionSettings connectionSettings =
            MqttConnectionSettings.create(
                "tcp://localhost:1883",
                "", //clientId is set in sink/source individually
                new MemoryPersistence()
            );

        Sink<MqttMessage, CompletionStage<Done>> mqttSink =
                MqttSink.create(connectionSettings.withClientId("m2m-drv-sta2-FileReq-sink"), MqttQoS.atLeastOnce());

        Source<MqttMessage, CompletionStage<Done>> mqttSource =
            MqttSource.atLeastOnce(
                connectionSettings
                    .withClientId("m2m-drv-sta2-FileReq-source")
                    .withCleanSession(false),
                MqttSubscriptions.create("sta2/p/+/evt/RequestFileInfo/#", MqttQoS.atLeastOnce()),
                BUFFER_SIZE).map(mqttIn -> {
            MqttMessage msg = mqttIn.message();
            ByteBuffer buf = msg.payload().asByteBuffer();
            log.info("We got the message from mqtt: Topic="+msg.topic()+" Payload="+buf.array().length);
            RequestFileInfo nfo = (RequestFileInfo)recSer.fromBinary(buf.array(), "RequestFileInfo");
            return new RequestFileInfoContainer(msg,nfo);
        }).map(cont -> {
            //TODO: poll lagom service and convert event

            log.info("pushing back to mqtt");
            byte[] payload = mapper.writeValueAsBytes(""/*TODO: insert converted value*/);
            return MqttMessage.apply("out/FileMetadata/avro", ByteString.fromArray(payload));
        });

        mqttSource.runWith(mqttSink, materializer);
    }

Thanks for any help!

geetings,
Michael

Hi Michael,

your code looks good. The Alpakka Mqtt connector gives the topic filter pattern to the PAHO client without any modification. So it should be fine.

Are you sure that you have topic that matches the given pattern?

Hi Martin,

yes. I subscribed & posted to the same topic with mqtt.fx (mqtt client) and i got the message i published in the subscription, but nothing in the service. In vernemq-trace tool i can see that both source and sink are connecting to the broker, but no subscription is performed.

In https://doc.akka.io/docs/alpakka/current/mqtt.html i read that i could use a flow to subscribe & publish in a single flow. Do you know if i am forced to use a flow or should this work also?

(sorry - i’m new to this alpakka streaming stuff)

greetings,
Michael

The implementation for consuming messages in Flow and in Source is the same. So it does not really matter which one you choose. The difference is only in the shape of the stream.

Since it is unclear why the subscription does not get through, you should try enabling debug logging. Maybe that will give some clue. Take a look at mqtt test logging configuration here and here to see how to enable verbose debug logging to a file.

Ahhh thank you!

2019-02-27 14:13:52,462 DEBUG [MQTT Rec: m2m-drv-sta2-FileReq-source] [o.e.p.c.m.internal.CommsReceiver] m2m-drv-sta2-FileReq-sink: Stopping, MQttException
org.eclipse.paho.client.mqttv3.MqttException: Invalid client ID

I changed the clientid => now its working as expected.

Last question: If i’m using atLeastOnce i need to ack every message after it has been posted to the other topic - where do have to call msg.ack() to make sure no message is lost?

greetings,
Michael

Thank you!

Glad that you were able to solve the issue!

There is an ack() method on MqttMessageWithAck.

Yes i already saw this method - but where can i call this in the stream to ensure a transaction?
I need to call it either AFTER the the new message has been written to the sink OR the processing of an event is filtered out.
Currently i observe that new messages are delivered to the stream immediately as i post them and i do not call msg.ack()… don’t know why that is…

You can use MqttFlow.atLeastOnce which will give you a flow. The messages you send to the flow will be sent to the broker and then emitted by the flow where you will be able to continue the stream and call ack() on those messages.

Ok - i this case i need the flow - thank you for your help!