Alpakka MQTT Streaming: Performance problem

Hi everyone,

first of all: I’m new to Akka streams and Alpakka…

I’m implementing an application which receives sensor measurement data via bluetooth from a sensor node and then sends this data via Alpakka MQTT streaming to a MQTT broker.
About between 60-80 sensor values are received per second.

I have a running setup which used the Eclipse Paho based implementation. This setup meets my performance demands.

Now I wanted to try out the MQTT streaming implementation and face a performance issue.
I used the example code:

val settings = MqttSessionSettings()
	val session = ActorMqttClientSession(settings)
	val connection = Tcp().outgoingConnection("brokerIPAddress", 1883)
	val mqttFlow: Flow[Command[Nothing], Either[MqttCodec.DecodeError, Event[Nothing]], NotUsed] =
		Mqtt
			.clientSessionFlow(session, ByteString("1"))
			.join(connection)
	val (commands: SourceQueueWithComplete[Command[Nothing]], events: Future[Publish]) =
		Source
			.queue(8, OverflowStrategy.dropHead)
			.via(mqttFlow)
			.collect {
				case Right(Event(p: Publish, _)) => p
			}.async
			.toMat(Sink.head)(Keep.both)
			.run()

Messages are sent as described in the docs:

session ! Command(Publish(ControlPacketFlags.RETAIN | ControlPacketFlags.QoSAtLeastOnceDelivery, "sensors/temperature", ByteString(temp)))

But compared to the Paho based implementation only half of the messages get delivered to the broker.
The queue seems to be filled almost immediately and many messages are dropped.
The underlying transport seems to be quite slow and I don’t know how I could improve the performance.

I was trying to increase the queue size to gargantous sizes and tried out different OverflowStrategies but the problem remains.

Any help is highly appreciated :slight_smile:

Kind regards,
Jan

Hi Jan,

even a large buffer can be quickly filled up by a tight loop sending messages to the queue.

I would recommend back-pressuring on the sending side. The SourceQueue.offer method returns a Future that gets completed when the element is accepted to the stream.

I am not sure what triggers message sends in your code and whether there is a nice way to wait until that offer Future completes, but you should bring back-pressure signal as much as possible to the source of the messages.

Hi Martynas,

thank you for your message!

Basically I have 8 listeners to bluetooth sensor readings (temperature, pressure, gyroscope, etc.). These react whenever they receive data and do nothing more except interpreting received byte arrays and then publishing them via session ! Command(Publish(... .
These 8 listeners generate the messages each with their own topic so there are 8 topics the app publishes to.

I also noticed that, after a few seconds some messages of 2-3 topics are not delivered at all while the other topics continue to work fine.

In the docs at the client example it also says:

Instead, the session is told to perform Publish given that it can retry continuously with buffering until a command flow is established.

Does this mean I have to implement something for the session to retry continuously?
I was just using the two snippets from the documentation.

EDIT:
When I use commands.offer(Command(Publish(... instead of sending them via ! to the session, no messages are delivered at all

Ahh yes. I mixed up the the MQTT streaming and Paho connectors. Yes, messages should be published by sending messages to the session actor.

Are you able to put together a reproducer?

We are using this in production and have no issue over message loss. With the Publish you can provide a promise that will be complete once the PubAck is received. You can then back pressure on that.

Try also with a QoS of 0. If that works for you then it is likely because of no back back pressure on the publish as above.

We may need to improve our docs. :-) Meanwhile, this test illustrates how an object can be carried such that it is output as an event in response to the puback: https://github.com/akka/alpakka/blob/master/mqtt-streaming/src/test/scala/docs/scaladsl/MqttSessionSpec.scala#L741

This PR may help: https://github.com/akka/alpakka/pull/1908

Hi,
I don’t think I can provide a reproducer.

Due to time constraints we were then sticking to the Eclipse Paho based implementation.

We were using QoS 0.

Maybe in the future when I have a chance to I will try to use the MQTT Streaming implementation again.

Best regards,

Jan