Trace problem when connecting a websocket

akka
akka-http
websockets
streams

(Andreas Bergmeier) #1

I have a graph, where it seems to close the stream prematurely.
Code is:

Source.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
            val exasolAnnounce = http
                .webSocketClientFlow(WebSocketRequest(socketUrl))
                    .named("ExasolAnnounceLogin")
                    .log("SendAnnounce")

            val announceResponse = builder.add(Flow[Message].map(data =>
                LoginCommand.Response.extract(data)
            ).named("Building Response").log("AnnouncementResponse"))
            val loginMessage = TextMessage((new LoginCommand).toJsonString())

            import GraphDSL.Implicits._

            Source.single(loginMessage) ~> exasolAnnounce ~> announceResponse

            SourceShape(announceResponse.out)
        })

Which gives the following trace for a third party Websocket Server:

[DEBUG] [09/12/2018 10:44:09.215] [TestSystem-akka.actor.default-dispatcher-5] [akka://TestSystem/system/IO-TCP/selectors/$a/0] Resolving 192.168.56.2 before connecting
[DEBUG] [09/12/2018 10:44:09.252] [TestSystem-akka.actor.default-dispatcher-6] [akka://TestSystem/system/IO-DNS] Resolution request for 192.168.56.2 from Actor[akka://TestSystem/system/IO-TCP/selectors/$a/0#1310905160]
[DEBUG] [09/12/2018 10:44:09.267] [TestSystem-akka.actor.default-dispatcher-6] [akka://TestSystem/system/IO-TCP/selectors/$a/0] Attempting connection to [/192.168.56.2:8563]
[DEBUG] [09/12/2018 10:44:09.271] [TestSystem-akka.actor.default-dispatcher-6] [akka://TestSystem/system/IO-TCP/selectors/$a/0] Connection established to [192.168.56.2:8563]
[DEBUG] [09/12/2018 10:44:09.300] [TestSystem-akka.actor.default-dispatcher-2] [akka.stream.Log(akka://TestSystem/system/StreamSupervisor-0)] [SendAnnounce] Upstream finished.

Now, if I use the echo Websocket Server, the stream processes fine.
My question now is where the actual problem is.
So

  1. Why is the stream getting closed?
  2. Who closes the stream?
  3. Why does this not raise an Exception?
  4. Why am I not getting any data?

(Johannes Rudolph) #2

Hi Andreas,

this line gives the right hint:

[DEBUG] [09/12/2018 10:44:09.300] [TestSystem-akka.actor.default-dispatcher-2] [akka.stream.Log(akka://TestSystem/system/StreamSupervisor-0)] [SendAnnounce] Upstream finished.

It’s your own Source.single which finishes the outgoing side of the websocket client connection after it sent the first message.

There’s a section in the docs about the problem: Half-Closed WebSockets.

Btw. it seems your stream graph is a simple linear one. In that case you don’t need any GraphDSL but can use the simpler style using via to combine Flows:

val exasolAnnounce = http
  .webSocketClientFlow(WebSocketRequest(socketUrl))
  .named("ExasolAnnounceLogin")
  .log("SendAnnounce")

val announceResponse = Flow[Message].map(data =>
    LoginCommand.Response.extract(data)
).named("Building Response").log("AnnouncementResponse")
val loginMessage = TextMessage((new LoginCommand).toJsonString())

val wsSource = 
  Source.single(loginMessage)
    .via(exasolAnnounce)
    .via(announceResponse)

Johannes


(Andreas Bergmeier) #3

Turned out that it was our Firewall :frowning:


(Johannes Rudolph) #4

Maybe it was part of the problem. As this is a race condition there may have been many causes, but the ultimate problem is still the one I linked above :slight_smile:


(Andreas Bergmeier) #5

Thanks for taking the time to explain.

It’s your own Source.single which finishes the outgoing side of the websocket client connection after it sent the first message.

Interesting.

Btw. it seems your stream graph is a simple linear one.

Nope. The resulting stream graph has stuff like fan ins and such.
Besides I have not read a comprehensive explanation of Materialization yet - so graphs are easier for me to comprehend (I think).

but the ultimate problem is still the one I linked above :slight_smile:

Correct. Took me a while to understand.

Soo one workaround could be to have a Source.maybe at the end of the stream and produce a dummy value shortly before? Or how would you prevent the Source from closing until all Websocket requests were done?