Proper Akka websocket client to receive infinite incoming stream

I’m trying to connect to WebSocket incoming infinite stream as described here. I see only couple of messages

[info] -----------------------
[info] 2024-03-28T22:20:57.023+08:00
[info] TextMessage.Strict({"topic":"orderbook.100.ETH-29MAR24-3575-C","ts":1711635657023,"type":"snapshot","id":"orderbook.100.ETH-29MAR24-3575-C-9136793338-1711635657022","data":{"s":"ETH-29MAR24-3575-C","b":[["46.6","74.7"],["46.5","120"],["45.8","52.1"],["44.2","240"]],"a":[["51.4","120"],["52.5","47.3"],["52.8","34.7"],["53.7","52.1"],["53.8","240"],["81.2","40"],["210","0.2"],["253.2","0.2"]],"u":667407,"seq":9136793338},"cts":1711635654863})
[info] -----------------------
[info] 2024-03-28T22:20:57.023+08:00
[info] TextMessage.Strict({"topic":"orderbook.100.ETH-29MAR24-3575-P","ts":1711635657023,"type":"snapshot","id":"orderbook.100.ETH-29MAR24-3575-P-9136792306-1711635657022","data":{"s":"ETH-29MAR24-3575-P","b":[["39.4","1.6"],["39.3","214"],["38.5","140.4"],["37.2","40"],["37.1","240.8"]],"a":[["44.1","93.6"],["44.2","120.4"],["44.6","40"],["44.9","140.4"],["46.9","240.8"],["100","0.2"],["238.8","0.2"]],"u":402912,"seq":9136792306},"cts":1711635650375})

Then the stream stops. Please, help to fix it.

source code
import akka.util.ByteString
import akka.actor.ActorSystem
import akka.{Done, NotUsed}
import akka.http.scaladsl.Http
import akka.stream.scaladsl.*
import akka.http.scaladsl.model.*
import akka.http.scaladsl.settings.*
import akka.http.scaladsl.model.ws.*
import org.json4s.*
import org.json4s.native.JsonMethods.*
import com.github.nscala_time.time.Imports.{DateTime, DateTimeZone}
import scala.concurrent.duration._

import scala.concurrent.{Future, Promise}

object WebSocketClientFlow {
  def main(args: Array[String]): Unit = {
    implicit val system = ActorSystem()
    import system.dispatcher

    val msg =
      """
        |{
        |    "op": "subscribe",
        |    "args": [
        |        "orderbook.100.ETH-29MAR24-3575-C",
        |        "orderbook.100.ETH-29MAR24-3575-P"
        |    ]
        |}
        |""".stripMargin

    val printSink: Sink[Message, Future[Done]] =
      Sink.foreach {
        case message: TextMessage.Strict =>
          println(message.text)
        case _ =>
        // ignore other message types
      }

    def handle(m: Message): Unit =
      m match
        case TextMessage.Strict(txt) =>
          implicit val formats = org.json4s.DefaultFormats

          val jsonVal   = parse(txt)
          val timestamp = (jsonVal \ "ts").extract[Long]
          println(s"""-----------------------
                     |${new DateTime(timestamp).withZone(DateTimeZone.forID("..."))}
                     |$m
                     |""".stripMargin)
        case x =>
          println(s"---> $x")
          ???

    val flow =
      Flow
        .fromSinkAndSourceMat(
          Sink.foreach[Message](handle),
          Source(List(TextMessage(msg))).concatMat(Source.maybe[Message])(Keep.right)
        )(Keep.right)

    val defaultSettings = ClientConnectionSettings(system)
    val customWebsocketSettings =
      defaultSettings.websocketSettings
        .withPeriodicKeepAliveData { () =>
          println("")
          ByteString(s"debug")
        }
    val customSettings =
      defaultSettings.withWebsocketSettings(customWebsocketSettings)

    val (upgradeResponse, closed) =
      Http().singleWebSocketRequest(
        request = WebSocketRequest("wss://stream.bybit.com/v5/public/option"),
        clientFlow = flow,
        log = system.log,
        settings = customSettings
      )

    val connected = upgradeResponse.map { upgrade =>
      if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
        Done
      } else {
        throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
      }
    }

    while true do Thread.sleep(1000)

    connected.onComplete(x => println(s"====== $x"))
  }
}

I don’t see anything there that would lead to the web socket being disconnected, are you sure it is not the server deciding to close it?

Note that the periodicKeepAliveData you configure is only sent if you actually configure periodic-keep-alive-max-idle/WebSocketSettings#periodicKeepAliveMaxIdle to some actual interval, by default it is disabled.

One thing you could do is attach a watchTermination to/before the message sink and log when that completes/fails, that could help figure out if the server closes its outgoing stream.

I added those lines to application.conf:

akka.http.client.websocket.periodic-keep-alive-mode = ping
akka.http.client.websocket.periodic-keep-alive-max-idle = 0.1 seconds

I also tried with ping.

And made change as follows:

    val flow =
      Flow
        .fromSinkAndSourceMat(
          Sink.foreach[Message](handle),
          Source(List(TextMessage(msg))).concatMat(Source.maybe[Message])(Keep.right)
        )(Keep.right)
        .watchTermination() { (prevMatValue, future) =>
          future.onComplete {
            case x =>
              println(s"Terminated: $x")
          }
          prevMatValue
        }

Now the server sends 6 messages instead of 4 :) The error:

Terminated: Failure(akka.stream.scaladsl.TcpIdleTimeoutException: TCP idle-timeout encountered on connection to [stream.bybit.com/<unresolved>:443], no bytes passed in the last 60 seconds)

Hmm, seems like that ping is not emitted then, if the connection is killed for being idle, try enabling logging on start akka.log-config-on-start = true to verify that the config that is used is what you think it is.

You could also try akka.http.client.websocket.log-frames = true and verify that you actually see ping-pong messages going to the server (entries will be logged at debug.)

I checked logs with akka.log-config-on-start = true. periodic-keep-alive-mode is applied.

When akka.http.client.websocket.log-frames = true, I don’t see that pings are going to server through web socket.

Have no idea how to fix that.

I’m afraid I don’t have a good idea what where to look further to figure this one out. Maybe try completely programatic enable ( websocketsettings.withPeriodicKeepAliveMode("ping")) in case it still is something about picking up the expected/right config.

I noticed we didn’t have complete test coverage for ws client ping but when adding that it works as expected with the client emitting ping frames (akka-http#4375).