How to keep the connection open to websocket server?

(Bifunctor) #1

Hi all

I have the following code, that does not keep the connection open to the websocket server:

import akka.Done
import{Actor, ActorLogging, Props}
import akka.http.scaladsl.model.StatusCodes
import akka.http.scaladsl.Http
import{Message, TextMessage, WebSocketRequest}
import{ActorMaterializer, Materializer}
import{Flow, Keep, Sink, Source}

import scala.concurrent._
import scala.util.{Failure, Success}

object WsActor {
  def props: Props = Props(new WsActor)

final class WsActor extends Actor with ActorLogging {

  import com.sweetsoft.WsConnector._

  implicit val materializer: Materializer = ActorMaterializer()
  implicit val ec: ExecutionContextExecutor = context.system.dispatcher
  implicit val actor = context.system

  // Future[Done] is the materialized value of Sink.foreach,
  // emitted when the stream completes
  private val incoming: Sink[Message, Future[Done]] =
  Sink.foreach[Message] {
    case message: TextMessage.Strict =>
    case _ =>
      println("Unknown messages.")

  //private val outgoing: Source[Message, Promise[Option[Message]]] =
  //  Source.maybe[Message]

  //  val flow: Flow[Message, Message, Promise[Option[Message]]] =
  //    Flow.fromSinkAndSourceMat(incoming, Source.maybe[Message])(Keep.right)"Websocket actor started.")

  override def receive: Receive = {
    case Initialized =>"Initialization to receive messages via stream.")
      sender() ! Ack
    case Completed =>"Streams completed.")
      sender() ! Ack
    case Msg(value) =>

      val replyTo = sender()
      val flow: Flow[Message, Message, Promise[Option[Message]]] =
        Flow.fromSinkAndSourceMat(incoming, Source.single(TextMessage(value)).concatMat(Source.maybe[Message])(Keep.right))(Keep.right)

      val (upgradeResponse, _) =
        Http().singleWebSocketRequest(WebSocketRequest("ws://"), flow.mapAsync(4)(msg => Future(msg)))

      upgradeResponse.flatMap { upgrade =>
        if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
        } else {
          throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
      }.onComplete {
        case Success(_) =>
          replyTo ! Ack
        case Failure(ex) => log.error(ex.getMessage)

    case Failed(ex) =>"Stream failed with ${ex.getMessage}.")


So every time, when a message is received, it will close the connection and open a new connection for the next request.
The question is, how can I keep the connection open?


(Johannes Rudolph) #2

Hi @bifunctor,

in brief, you can keep such a connection by using an Source.actorRef or Source.queue. actorRef directly materializes into an ActorRef that you can then use to send messages to the WS connection. queue will give you a queue-like interface to enqueue elements to be sent on the WS connection.

This might require a bit of fiddling to get the right materialized values out of the call to singleWebSocketRequest (right now you are throwing the materialized value away).