Akka MQTT Streaming in java

Hello, I was working with Alpakka MQTT connector and was following the doc on the akka website.

Here the maven dependencies state akka 2.6.14 version and 2.12 version for scala. In another project we’re using 2.6.17 akka version and 2.13 scala version. Was wondering if I missed out the latest version for the connector or 2.6.14 akka version and 2.12 scala version are the only ones right now that could be used with the connector. I tried 2.6.17 akka and 2.13 scala versions though but the connector didn’t seem to work. Any suggestions/explanations?
Thanks,
AAA

Hi @abdulahadanwer

You can’t mix Scala 2.12 and 2.13 in one project.
Similarly, you should make sure to use all Akka libraries of the same version.

See even Versioning • Alpakka Documentation

Hope this helps,
Enno.

Thanks for the response @ennru. Appreciate it. What my actual question though was if I can use the mqtt streaming connector with 2.6.17 akka and 2.13 scala versions? The other projects are separate and there’s no mixing up of scala 2.12 and 2.13 in a single project. I identified 2.13 only because 2.13 scala version and 2.6.17 akka versions are consistent in all our other projects and I wanted to use mqtt project also with the same versions. Though in MQTT project when I tried to use 2.6.17 akka and 2.13 scala versions and not 2.6.14 version and 2.12 version, it didn’t seem to work. Wanted to know if this is how it is supposed to be.

Thanks
Ahad

Alpakka MQTT streaming is expected to work with Scala 2.13 and Akka 2.6.17 (and 2.6.18, which is the latest release).
In case it does not, please file an issue in the Alpakka repository giving more details of what fails.

Cheers,
Enno.

Before filing, let me give here a brief of what I’m doing and if there’s something wrong then you can identify. Otherwise will go and file it to the repo.
For the following, which is from MQTT • Alpakka Documentation

<properties>
  <akka.version>2.6.14</akka.version>
  <scala.binary.version>2.12</scala.binary.version>
</properties>
<dependencies>
  <dependency>
    <groupId>com.lightbend.akka</groupId>
    <artifactId>akka-stream-alpakka-mqtt_${scala.binary.version}</artifactId>
    <version>3.0.4</version>
  </dependency>
  <dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-stream_${scala.binary.version}</artifactId>
    <version>${akka.version}</version>
  </dependency>
</dependencies>

I create Actor System something like that

final ActorSystem mqttSubscriberClient1 = ActorSystem.create(“MqttSubscriberClient1”);

Use this system within the following and nothing seems to break

MqttSessionSettings settings = MqttSessionSettings.create();
    MqttClientSession session = ActorMqttClientSession.create(settings, system);

    Flow<ByteString, ByteString, CompletionStage<Tcp.OutgoingConnection>> connection =
        Tcp.get(system).outgoingConnection(host, port);

    Flow<Command<Object>, DecodeErrorOrEvent<Object>, NotUsed> mqttFlow =
        Mqtt.clientSessionFlow(session, ByteString.fromString(connectionId)).join(connection);

    SourceQueueWithComplete<Command<Object>> commands =
        Source.<Command<Object>>queue(20, OverflowStrategy.fail(), 20)
            .via(mqttFlow)
            .collect(
                new JavaPartialFunction<DecodeErrorOrEvent<Object>, Publish>() {
                  @Override
                  public Publish apply(DecodeErrorOrEvent<Object> x, boolean isCheck) {
                    if (x.getEvent().isPresent() && x.getEvent().get().event() instanceof Publish)
                      return (Publish) x.getEvent().get().event();
                    else throw noMatch();
                  }
                })
            .wireTap(event -> {
              System.out.println("--------------------------------------------------------------------------------");
              System.out.println("Client: " + connectionId + " received payload: " + event.payload().utf8String());
//              System.out.println("Client: " + connectionId + " received topic name: " + event.topicName());
//              System.out.println("Client: " + connectionId + " received packet id: " + event.packetId().toString());
//              System.out.println("Client: " + connectionId + " received flags: " + event.flags());
              System.out.println("--------------------------------------------------------------------------------");
            })
            .toMat(Sink.ignore(), Keep.left())
            .run(system);

However, when I upgrade the versions like

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>maxio-mqtt-streaming</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
        <akka.version>2.6.17</akka.version>
        <scala.binary.version>2.13</scala.binary.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>com.lightbend.akka</groupId>
            <artifactId>akka-stream-alpakka-mqtt-streaming_${scala.binary.version}</artifactId>
            <version>3.0.4</version>
        </dependency>
        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-stream_${scala.binary.version}</artifactId>
            <version>${akka.version}</version>
        </dependency>
        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-actor-typed_${scala.binary.version}</artifactId>
            <version>${akka.version}</version>
        </dependency>
    </dependencies>
</project>

There seems like no compilation error, but running the main class throws error at this same line
final ActorSystem mqttSubscriberClient1 = ActorSystem.create(“MqttSubscriberClient1”);

When I converted the above using typed.ActorSystem, i.e.

final ActorSystem mqttSubscriberClient1  = ActorSystem.create(Behaviors.empty(), "MqttSubscriberClient1");

I still got the same error which is follows.

LF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Exception in thread "main" java.lang.IllegalStateException: You are using version 2.6.17 of Akka, but it appears you (perhaps indirectly) also depend on older versions of related artifacts. You can solve this by adding an explicit dependency on version 2.6.17 of the [akka-stream-typed] artifacts to your project. Here's a complete collection of detected artifacts: (2.6.14, [akka-stream-typed]), (2.6.17, [akka-actor, akka-actor-typed, akka-protobuf-v3, akka-slf4j, akka-stream]). See also: https://doc.akka.io/docs/akka/current/common/binary-compatibility-rules.html#mixed-versioning-is-not-allowed
	at akka.util.ManifestInfo.checkSameVersion(ManifestInfo.scala:184)
	at akka.util.ManifestInfo.checkSameVersion(ManifestInfo.scala:162)
	at akka.actor.ActorSystemImpl.liftedTree2$1(ActorSystem.scala:1033)
	at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:1022)
	at akka.actor.ActorSystemImpl._start(ActorSystem.scala:1022)
	at akka.actor.ActorSystemImpl.start(ActorSystem.scala:1045)
	at akka.actor.typed.ActorSystem$.createInternal(ActorSystem.scala:290)
	at akka.actor.typed.ActorSystem$.apply(ActorSystem.scala:198)
	at akka.actor.typed.ActorSystem$.create(ActorSystem.scala:235)
	at akka.actor.typed.ActorSystem.create(ActorSystem.scala)
	at SubscribersMain1.main(SubscribersMain1.java:7)

Please let me know if I’m missing anything.

Thanks,
Ahad