Websocket messages

Hi guys,

I’m fairly new to both websockets & Akka. I was trying to use websockets to get some data coming from a sever. I wanted to print the incoming messages from the server.
I used:

 val (upgradeResponse, done) = Http().singleWebSocketRequest(WebSocketRequest("ws://ws-sandbox.coinapi.io/v1/"), flow)

& inspected the upgrade response that came back, which was all good.
I got:
101 Switching Protocols
Connection: upgrade
Upgrade: websocket
Sec-Websocket-Accept
as well as some custom headers that the API documentation mentions it will give back.

So it is all good.
However I’m not sure how to get the rest of the messages that start flowing through after the handshake has been successful.

I took a look at the sandbox: https://app.gosandy.io/?url=https://raw.githubusercontent.com/coinapi/coinapi-sdk/master/data-api/sandy-ws-v1.json&u=Ar
after providing the same hello message that my simple Akka application produces the messages start arriving rapidly within this sandbox.

But I’m not sure how to express this in code. Would someone be able to help with this please?

Regards,
Dips.

This runnable client/server roundtrip example shows different ways to use Websockets with akka-http:

The singleWebSocketRequestClient is probably what you are looking for.

Hope that helps
Paul

1 Like

Thanks for sharing the example, @pbernet.

Thank you very much for the link to this @pbernet I have just gone through the talk as well as the first two parts of the 4 part blog series. I’ve cloned the repository & found a lot of helpful things in there, especially when it comes to creating the server.

However the code for the singleWebSocketRequestClient is very similar to what I already have.
My apologies I should have posted more than just the line to connect:

    val printSink: Sink[Message, Future[Done]] =
      Sink.foreach {
        case TextMessage.Strict(text) => println(s"got text: $text")
        case TextMessage.Streamed(textStream) => textStream.runFold("")(_ + _).onComplete(value => println(s"got streamed message: ${value.get}"))
        case _ => println("something")
      }

    val helloMessage: HelloMessage = HelloMessage("hello", CoinApiConstants.apiKey, heartbeat = false, Seq("trade"), Seq("BITSTAMP_SPOT_BTC_USD$", "COINBASE_", "ITBIT_"))
    val helloSource: Source[Message, NotUsed] = Source.single(TextMessage(helloMessage.asJson.toString()))

val flow: Flow[Message, Message, NotUsed] = Flow.fromSinkAndSourceMat(printSink, helloSource)(Keep.right)

val (upgradeResponse, closed) = Http().singleWebSocketRequest(WebSocketRequest(CoinApiConstants.nonEncryptedAddress), flow)

val connected = upgradeResponse.map { upgrade =>
      if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
        Done
      } else {
        throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
      }
    }

For clarity the HelloMessage class is just a case class that is converted to Json & sent to the server:

import io.circe.generic.extras.{ Configuration, ConfiguredJsonCodec }

@ConfiguredJsonCodec case class HelloMessage(`type`: String, apikey: String, heartbeat: Boolean, subscribeDataType: Seq[String], subscribeFilterSymbolId: Seq[String])

object HelloMessage {
  implicit val helloMessageConfig: Configuration = Configuration.default.withSnakeCaseMemberNames
}

My original problem was that none of the messages coming from the server would be printed after a successful handshake.

To quote the api:

If everything is correct, we will provide you with a continuous stream of real-time market data updates.

I’ve checked the connection settings & everything is still open. So I’m not sure why the messages aren’t being printed to be honest.

To my knowledge, the flow is still reusable & as long as data would still be coming through it would be hitting the code for the sink … I had put the catch all as the first case as well just in case there might have been something funny with the messages but nothing was printed in this case either.

@jrudolph perhaps there is something that I’ve not quite understood ?

Regards,
Dips

I guess that using Source.single might be the reason because it will try to (half-close) the WS connection after sending that first message.

Instead, you can try using either a Source.queue for submitting elements or use something like Source.single(...).concat(Source.maybe) which will keep the connection open from your side indefinitely.

Thank you very much for the suggestion @jrudolph - I had actually tried this last night. I was reading up on the documentation where it was suggested Client-Side WebSocket Support • Akka HTTP

I came up with this:

    val helloMessage: HelloMessage = HelloMessage("hello", CoinApiConstants.apiKey, heartbeat = false, Seq("trade"), Seq("BITSTAMP_SPOT_BTC_USD$", "BITFINEX_SPOT_BTC_LTC$", "COINBASE_", "ITBIT_"))
    val helloSource: Source[Message, Promise[Option[Message]]] = Source.single(TextMessage(helloMessage.asJson.toString())).concatMat(Source.maybe[Message])(Keep.right)

val flow: Flow[Message, Message, Promise[Option[Message]]] = Flow.fromSinkAndSourceMat(printSink, helloSource)(Keep.right)

    val (upgradeResponse, promise) = Http().singleWebSocketRequest(WebSocketRequest(CoinApiConstants.nonEncryptedAddress), flow)

    println("something ...")
    promise.success(None)

With the .concatMat(Source.maybe[Message]) appended on the end as you have suggested above.
But again I found that nothing was coming through.
So I noticed that I need to reply with Pong messages as well otherwise the sever will cut the connection.
So I went ahead & also added some configuration:

akka.http {
    client {
        websocket {
            log-frames = true
            periodic-keep-alive-mode = pong
        }
    }
}

However there hasn’t been any logging or anything coming through since then.

Can you enable akka.http.client.log-unencrypted-network-bytes=100 and akka.http.client.websocket.log-frames=true and send the output? That should show whether and which messages arrive over the network.

As soon as you make this call you might close the connection. Is that what you want?

correct the promise.success(None) will close the connection. That is just something I added afterwards.

Wow the configuration you have provided has worked … messages are flodding in:

Client received TextMessage.Strict: {"time_exchange":"2021-10-07T09:02:38.8806960Z","time_coinapi":"2021-10-07T09:02:38.9422823Z","uuid":"d07a1a2d-0a15-41c5-a1da-4d755e3e98bc","price":3586.56,"size":1.05,"taker_side":"BUY","symbol_id":"COINBASE_SPOT_ETH_USD","sequence":446198,"type":"trade"}
Client received TextMessage.Strict: {"time_exchange":"2021-10-07T09:02:38.8893030Z","time_coinapi":"2021-10-07T09:02:38.9604117Z","uuid":"900a917b-3b53-4f43-b194-9a22c1633d89","price":0.7188,"size":16.3,"taker_side":"SELL","symbol_id":"COINBASE_SPOT_FET_USDT","sequence":36,"type":"trade"}
Client received TextMessage.Strict: {"time_exchange":"2021-10-07T09:02:38.9162370Z","time_coinapi":"2021-10-07T09:02:38.9786385Z","uuid":"1d038288-6dbd-4990-b2bf-62dffc7d1760","price":26.005,"size":15.278058,"taker_side":"SELL","symbol_id":"COINBASE_SPOT_UNI_USD","sequence":432,"type":"trade"}
Client received TextMessage.Strict: {"time_exchange":"2021-10-07T09:02:38.8900600Z","time_coinapi":"2021-10-07T09:02:38.9604421Z","uuid":"0fd30d13-77e7-47f8-8bb4-b4d522ae5a05","price":3586.56,"size":1.75,"taker_side":"BUY","symbol_id":"COINBASE_SPOT_ETH_USD","sequence":446199,"type":"trade"}
Client received TextMessage.Strict: {"time_exchange":"2021-10-07T09:02:38.9462740Z","time_coinapi":"2021-10-07T09:02:39.0137320Z","uuid":"b5432bb2-e579-4c28-9538-b35e58e20f4b","price":0.00004327,"size":430,"taker_side":"BUY","symbol_id":"COINBASE_SPOT_ADA_BTC","sequence":171,"type":"trade"}
Client received TextMessage.Strict: {"time_exchange":"2021-10-07T09:02:38.9462740Z","time_coinapi":"2021-10-07T09:02:39.0137470Z","uuid":"7ec9c452-6455-4887-bf2a-1da906c8632e","price":0.00004327,"size":7.15,"taker_side":"BUY","symbol_id":"COINBASE_SPOT_ADA_BTC","sequence":172,"type":"trade"}
Client received TextMessage.Strict: {"time_exchange":"2021-10-07T09:02:38.9162610Z","time_coinapi":"2021-10-07T09:02:38.9785219Z","uuid":"ca1cec0d-4797-4b21-ba5c-e1a5a464936b","price":3586.56,"size":0.5,"taker_side":"BUY","symbol_id":"COINBASE_SPOT_ETH_USD","sequence":446200,"type":"trade"}
Client received TextMessage.Strict: {"time_exchange":"2021-10-07T09:02:38.9268650Z","time_coinapi":"2021-10-07T09:02:38.9909299Z","uuid":"8ce60b80-e525-49e7-8b60-0f68c6ed903f","price":3586.56,"size":0.9,"taker_side":"BUY","symbol_id":"COINBASE_SPOT_ETH_USD","sequence":446201,"type":"trade"}
Client received TextMessage.Strict: {"time_exchange":"2021-10-07T09:02:38.9437090Z","time_coinapi":"2021-10-07T09:02:39.0082714Z","uuid":"a6f39997-f94c-4ab0-810b-d9967e260c6b","price":3586.56,"size":0.25,"taker_side":"BUY","symbol_id":"COINBASE_SPOT_ETH_USD","sequence":446202,"type":"trade"}
akka.http.client.log-unencrypted-network-bytes=100
akka.http.client.websocket.log-frames=true

After adding this configuration, I can see the messages being printed !

It is quite strange because hte Sink that I have:

val printSink: Sink[Message, Future[Done]] =
      Sink.foreach {
        case TextMessage.Strict(text) => println(s"Client received TextMessage.Strict: $text")
        case TextMessage.Streamed(textStream) => textStream.runFold("")(_ + _).onComplete(value => println(s"Client received TextMessage.Streamed: ${value.get}"))
        case _ => println("something")
      }

I am assuming will catch the TextMessage.Strict(text) & print those messages?

My apologies @jrudolph … that is from the sink ! (smh).
Thank you very much for that configuration & helping me with this ! I really do appreciate it :slight_smile: