Alpakka MQTT Streaming with credentials

I need to enable username/password based alpakka mqtt server session streaming for clients to connect with credentials.

As a client;

val connect: Connect =
  appConf.mqtt.username.flatMap(username => =>
      Connect(clientName, ConnectFlags.CleanSession, username, password)
  ).getOrElse(Connect(clientName, ConnectFlags.CleanSession))


You simply add the creds to the Connect.

As a server you need to build a more complicated logic.

val bindSource =
      .bind("", port)
      .mapAsyncUnordered(maxConnections){ connection =>

This is pretty straightforward, mostly glue function.

def hanlerGraph(connection: Tcp.IncomingConnection) = {
  val mqttFlow: Flow[Command[Nothing], Either[MqttCodec.DecodeError, Event[Nothing]], NotUsed] =
      .serverSessionFlow(session, ByteString(connection.remoteAddress.getAddress.getAddress))

This is not so interesting either.

private def handler(s: Either[MqttCodec.DecodeError, Event[Nothing]]): Command[Nothing] = {
    s match {
      case Right(Event(c: Connect, _)) =>
        if(c.userName.nonEmpty && c.password.contains("supersecret"))
          Command(ConnAck(ConnAckFlags.None, ConnAckReturnCode.ConnectionAccepted))
           Command(ConnAck(ConnAckFlags.None, ConnAckReturnCode.ConnectionRefusedBadUsernameOrPassword))
      case Right(Event(cp: Subscribe, _)) =>
        Command(SubAck(cp.packetId,, None, None)
      case Right(Event(publish @ Publish(flags, _, Some(packetId), _), _))
          if flags.contains(ControlPacketFlags.RETAIN) =>
      case _ =>

This is a function where you should handle the messages. At the connection you could match/extract the u/p from the Connection message, an match it with your db, and return errors.
This function will probably need to return with a Future[List[Command[Nothing]]] and you need to modify the handlerGraph too.