MQTT flow not subscribing

Hi there,

I have been trying to get the alpakka mqtt flow working https://doc.akka.io/docs/alpakka/current/mqtt.html#publish-and-subscribe-in-a-single-flow but for some reason it doesn’t seem to be receiving messages on the downstream side of the flow (publishing works).
I’ve tried making the same subscription with just the sink, and it appears to work fine.

Sample code I’m using:

val connectionSettings = MqttConnectionSettings(
    "ssl://iot.eclipse.org:8883",
    "some-client-id99791194779sdfgsdgf84984",
    new MemoryPersistence
  )

val flow = MqttFlow.atMostOnce(
    connectionSettings,
    MqttSubscriptions("prefixx/#", MqttQoS.atLeastOnce),
    bufferSize = 5,
    MqttQoS.AtLeastOnce
  )

  val source = MqttSource.atMostOnce(
    connectionSettings,
    MqttSubscriptions("prefixx/#", MqttQoS.atLeastOnce),
    5
  )

// works
source.runForeach(println) 

// publishes but does not print
Source(List(AkkaMqttMessage("prefixx/321", ByteString("test test"))))
    .via(flow)
    .runForeach(println)

I’ve tested using MQTTBox to send and receive on the eclipse test server.
Anyone have an idea why this might be happening?