I’m trying to connect to WebSocket incoming infinite stream as described here. I see only couple of messages
[info] -----------------------
[info] 2024-03-28T22:20:57.023+08:00
[info] TextMessage.Strict({"topic":"orderbook.100.ETH-29MAR24-3575-C","ts":1711635657023,"type":"snapshot","id":"orderbook.100.ETH-29MAR24-3575-C-9136793338-1711635657022","data":{"s":"ETH-29MAR24-3575-C","b":[["46.6","74.7"],["46.5","120"],["45.8","52.1"],["44.2","240"]],"a":[["51.4","120"],["52.5","47.3"],["52.8","34.7"],["53.7","52.1"],["53.8","240"],["81.2","40"],["210","0.2"],["253.2","0.2"]],"u":667407,"seq":9136793338},"cts":1711635654863})
[info] -----------------------
[info] 2024-03-28T22:20:57.023+08:00
[info] TextMessage.Strict({"topic":"orderbook.100.ETH-29MAR24-3575-P","ts":1711635657023,"type":"snapshot","id":"orderbook.100.ETH-29MAR24-3575-P-9136792306-1711635657022","data":{"s":"ETH-29MAR24-3575-P","b":[["39.4","1.6"],["39.3","214"],["38.5","140.4"],["37.2","40"],["37.1","240.8"]],"a":[["44.1","93.6"],["44.2","120.4"],["44.6","40"],["44.9","140.4"],["46.9","240.8"],["100","0.2"],["238.8","0.2"]],"u":402912,"seq":9136792306},"cts":1711635650375})
Then the stream stops. Please, help to fix it.
source code
import akka.util.ByteString
import akka.actor.ActorSystem
import akka.{Done, NotUsed}
import akka.http.scaladsl.Http
import akka.stream.scaladsl.*
import akka.http.scaladsl.model.*
import akka.http.scaladsl.settings.*
import akka.http.scaladsl.model.ws.*
import org.json4s.*
import org.json4s.native.JsonMethods.*
import com.github.nscala_time.time.Imports.{DateTime, DateTimeZone}
import scala.concurrent.duration._
import scala.concurrent.{Future, Promise}
object WebSocketClientFlow {
def main(args: Array[String]): Unit = {
implicit val system = ActorSystem()
import system.dispatcher
val msg =
"""
|{
| "op": "subscribe",
| "args": [
| "orderbook.100.ETH-29MAR24-3575-C",
| "orderbook.100.ETH-29MAR24-3575-P"
| ]
|}
|""".stripMargin
val printSink: Sink[Message, Future[Done]] =
Sink.foreach {
case message: TextMessage.Strict =>
println(message.text)
case _ =>
// ignore other message types
}
def handle(m: Message): Unit =
m match
case TextMessage.Strict(txt) =>
implicit val formats = org.json4s.DefaultFormats
val jsonVal = parse(txt)
val timestamp = (jsonVal \ "ts").extract[Long]
println(s"""-----------------------
|${new DateTime(timestamp).withZone(DateTimeZone.forID("..."))}
|$m
|""".stripMargin)
case x =>
println(s"---> $x")
???
val flow =
Flow
.fromSinkAndSourceMat(
Sink.foreach[Message](handle),
Source(List(TextMessage(msg))).concatMat(Source.maybe[Message])(Keep.right)
)(Keep.right)
val defaultSettings = ClientConnectionSettings(system)
val customWebsocketSettings =
defaultSettings.websocketSettings
.withPeriodicKeepAliveData { () =>
println("")
ByteString(s"debug")
}
val customSettings =
defaultSettings.withWebsocketSettings(customWebsocketSettings)
val (upgradeResponse, closed) =
Http().singleWebSocketRequest(
request = WebSocketRequest("wss://stream.bybit.com/v5/public/option"),
clientFlow = flow,
log = system.log,
settings = customSettings
)
val connected = upgradeResponse.map { upgrade =>
if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
Done
} else {
throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
}
}
while true do Thread.sleep(1000)
connected.onComplete(x => println(s"====== $x"))
}
}