Connect akka websocket flow to a Graph

I am wondering what’s the best way for a WebSocketFlow to be connected to a Graph that can later fan out based on the stream messages. At present, I have tried the following:

val diamond = GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>

    val partition = builder.add(Partition[ReceivedSocketMsg](numPartitions, {
      case ReceivedSocketMsg(_, _, _, _, _, _, sym, _, _) => coinMapper(sym)
    }))

    socketConfig.subscribeFilterIds.foreach { coin =>
      val outFlow = builder.add(Sink.foreach[ReceivedSocketMsg](fanOut(_)))
      partition ~> outFlow
    }

    SinkShape(partition.in)
  }

  val sourceGraph = GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>

    val sendStr = Source(List(TextMessage(compact(render(jsonMsg)))))
    val flow: Flow[Message, Message, Promise[Option[Message]]] =
      Flow.fromSinkAndSourceMat(Sink.foreach[Message](println),
        sendStr.concatMat(Source.maybe[Message])(Keep.right))(Keep.right)

    val (upgradeResponse, _) =
      Http().singleWebSocketRequest(WebSocketRequest(socketUrl), flow)

    val connected = upgradeResponse.map { upgrade =>
      // just like a regular http request we can access response status which is available via upgrade.response.status
      // status code 101 (Switching Protocols) indicates that server support WebSockets
      if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
        Done
      } else {
        throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
      }
    }

    // in a real application you would not side effect here
    // and handle errors more carefully
    connected.onComplete(println)
    sender ! "Connected"

    val socketData = builder.add(flow.map(streamProcessor(_)))

    SourceShape(FlowShape(socketData.in, socketData.out).out)
  }

  Source.fromGraph(sourceGraph).to(diamond).run()

While the helper function looks like following:

def streamProcessor(message: Message): ReceivedSocketMsg = {
  message match {
  case TextMessage.Strict(text) => 
    parse(text).extract[ReceivedSocketMsg]
}}

(This will need to be improved using better case matches for all other cases)

I am currently getting the following error:

Illegal GraphDSL usage. Inlets [Map.in] were not returned in the resulting shape and not connected. java.lang.IllegalStateException: Illegal GraphDSL usage. Inlets [Map.in] were not returned in the resulting shape and not connected.

Any suggestion would be greatly helpful.

Hi Syed,
You shouldn’t do this

SourceShape(FlowShape(socketData.in, socketData.out).out)

You need to pipe the sockedData.in to something and just return the SourceShape withput creating a FlowShape.

val someSource = builder.add(Source.single(1))
someSource ~> sockedData
SourceShape(sockedData.out)

This answer is just about your error message.

1 Like