Hi all!
I’ve got a question about alpakka streaming.
I’m trying to use alpakka-mqtt to subscribe to a mqtt topic, poll a lagom service, convert event and post to another mqtt topic. My problem is that alpakka is only connecting to the mqtt broker but not subscribing to the defined topic. I hope someone can drop me a hint what i’m doing wrong.
My Code is: (it’s a lagom service)
private FileRequestHandler(Config cfg, ActorSystem sys) {
final Materializer materializer = ActorMaterializer.create(sys);
final RecordSerializer recSer = new RecordSerializer();
final ObjectMapper mapper = new ObjectMapper();
mapper.registerModule(new Jdk8Module());
mapper.registerModule(new ParameterNamesModule());
MqttConnectionSettings connectionSettings =
MqttConnectionSettings.create(
"tcp://localhost:1883",
"", //clientId is set in sink/source individually
new MemoryPersistence()
);
Sink<MqttMessage, CompletionStage<Done>> mqttSink =
MqttSink.create(connectionSettings.withClientId("m2m-drv-sta2-FileReq-sink"), MqttQoS.atLeastOnce());
Source<MqttMessage, CompletionStage<Done>> mqttSource =
MqttSource.atLeastOnce(
connectionSettings
.withClientId("m2m-drv-sta2-FileReq-source")
.withCleanSession(false),
MqttSubscriptions.create("sta2/p/+/evt/RequestFileInfo/#", MqttQoS.atLeastOnce()),
BUFFER_SIZE).map(mqttIn -> {
MqttMessage msg = mqttIn.message();
ByteBuffer buf = msg.payload().asByteBuffer();
log.info("We got the message from mqtt: Topic="+msg.topic()+" Payload="+buf.array().length);
RequestFileInfo nfo = (RequestFileInfo)recSer.fromBinary(buf.array(), "RequestFileInfo");
return new RequestFileInfoContainer(msg,nfo);
}).map(cont -> {
//TODO: poll lagom service and convert event
log.info("pushing back to mqtt");
byte[] payload = mapper.writeValueAsBytes(""/*TODO: insert converted value*/);
return MqttMessage.apply("out/FileMetadata/avro", ByteString.fromArray(payload));
});
mqttSource.runWith(mqttSink, materializer);
}
Thanks for any help!
geetings,
Michael