Acknowledging PING from Server in the middle of a stream

I’m implementing a stream from a Twitch channel IRC server, I succeeded in implementing the Flow and the Sink works by printing the messages on console, so far so good.
The problem now is that the IRC server will send a PING message to confirm I’m still alive, so on Sink I need to match against the text, it’s ok. The problem is how can I send the PONG back to the IRC from the Sink match clause ?

Here is the code I got so far:


import akka.Done
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.ws._
import akka.stream.ActorMaterializer
import akka.stream.actor.ActorPublisher
import akka.stream.scaladsl._
import akka.util.ByteString
import com.typesafe.config.ConfigFactory

import scala.concurrent.{ExecutionContext, Future}

object Application extends App {

  implicit val system: ActorSystem = ActorSystem("default")
  implicit val materializer: ActorMaterializer = ActorMaterializer()
  implicit val ex: ExecutionContext = system.dispatcher

  val config = ConfigFactory.defaultApplication()
  val wsConfig = config.getConfig("twitch-irc")
  val endpoint = Uri(wsConfig.getString("endpoint"))
  val token = wsConfig.getString("token")
  val nickname = wsConfig.getString("nickname")
  val channel = wsConfig.getString("channel")

  val sink: Sink[Message, Future[Done]] =
    Flow[Message]
      .mapAsync(4) {
        // would match the PING message here
        case message: TextMessage.Strict =>
          println(message.text)
          Future.successful(Done)
        case message: TextMessage.Streamed =>
          message.textStream.runForeach(println)
        case message: BinaryMessage =>
          message.dataStream.runWith(Sink.ignore)
      }

      .toMat(Sink.last)(Keep.right)

  val cmds = Seq(
    TextMessage(
      "CAP REQ :twitch.tv/tags twitch.tv/commands twitch.tv/membership"
    ),
    TextMessage(s"PASS $token"),
    TextMessage(s"NICK $nickname"),
    TextMessage(s"USER $nickname 8 * :$nickname"),
    TextMessage(s"JOIN #$channel")
  )

  val source =
    Source(cmds.to[scala.collection.immutable.Seq])
      .concatMat(Source.maybe)(Keep.both)

  val myFlow: Flow[Message, Message, Future[WebSocketUpgradeResponse]] =
    Http().webSocketClientFlow(
      WebSocketRequest(endpoint),
      settings = defaultSettings.withWebsocketSettings(wsSettings)
    )

  val ((completionPromise, upgradeResponse), closed) = source
    .viaMat(myFlow)(Keep.both)
    .toMat(sink)(Keep.both)
    .run()

  val connected = upgradeResponse.flatMap { upgrade =>
    if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
      Future.successful(Done)
    } else {
      throw new RuntimeException(
        s"Connection failed: ${upgrade.response.status}"
      )
    }
  }
  connected.onComplete(println)
  closed.foreach(_ => {
    println("closed")
    system.terminate
  })


1 Like

Hy!

There is a shape called BiDi-Flow. These are most of the time for encode-decode things (like zip-unzip), but can be used in your case too (shortcut and generate ad-hoc response). I think you need to LEGO this out from a partition, map, and merge.

https://doc.akka.io/docs/akka/2.5/stream/stream-graphs.html#bidirectional-flows

1 Like

I tried to understand bidi flows and even flows. Now it all seems too much for simple use case

Try with something like this:

import org.scalatest.{Matchers, WordSpecLike}

class BidiDraft extends WordSpecLike with Matchers {

  import akka.actor.ActorSystem
  import akka.stream.{ActorMaterializer, BidiShape}
  import akka.stream.scaladsl.{Flow, GraphDSL, Merge, Partition, Sink, Source}

  import scala.concurrent.{ExecutionContext, Await}
  import concurrent.duration._
  import concurrent.Future
  import scala.collection.immutable

  implicit val system: ActorSystem = ActorSystem()
  implicit val ec: ExecutionContext = system.dispatcher
  implicit val mat = ActorMaterializer()

  def pingShortcut = {

    def pingPartition(s: String) = {
      if(s == "PING") 1
      else 0
    }

    def pingTransformer(s: String) = {
      "PONG"
    }

    GraphDSL.create() { implicit builder =>
      import GraphDSL.Implicits._

      val part = builder.add(Partition[String](2, pingPartition))
      val merge = builder.add(Merge[String](2))
      val transf = builder.add(Flow[String].map(pingTransformer))

      part.out(1) ~> transf ~> merge.in(1)

      BidiShape(part.in, part.out(0), merge.in(0), merge.out)
    }
  }


  "pingShortcut" should {
    "convert pings to pongs" in {
      var upstreamCollect = List.empty[String]
      val data = List("a", "PING", "test", "test2", "PING")
      val socketMock = Flow.fromSinkAndSourceMat(Sink.seq[String], Source(data))((a,b) => a)
      val upstreamLogicMock = Flow[String].map(x => {upstreamCollect = x :: upstreamCollect; x})

      val result: Future[immutable.Seq[String]] = socketMock.join(pingShortcut).join(upstreamLogicMock).run()
      Await.result(result, 1.second) shouldBe List("a", "PONG", "test", "test2", "PONG")
      upstreamCollect.reverse shouldBe List("a", "test", "test2")
    }
  }
}

In your case the upstreamLogic is the sink and source you builded (you can make it as a flow from sinkAndSource like I did in the socketMock). The socketMock in your case is the Flow returned from the websocket creation. (You need to refactor the Strings to Messages or whatever you have, and you can use source via (socket.join(shortcut)) to sink format too.) Try to click into the stages and read the comments for more info, and ask here if you still have questions.

1 Like

So I took your example and implemented with my original code, which worked, sending the pong message back when ping was received.

Code:


object Application extends App {
  implicit val system: ActorSystem = ActorSystem()
  implicit val ec: ExecutionContext = system.dispatcher
  implicit val mat = ActorMaterializer()

  def pingShortcut = {

    def pingPartition(s: Message) = s match {
      case TextMessage.Strict(text) if text.contains("PING :tmi.twitch.tv") =>
        1 // goes to first partition
      case _ => 0 // anything else goes to default
    }

    def pingTransformer(s: Message) = {
      TextMessage("PONG :tmi.twitch.tv")
    }

    GraphDSL.create() { implicit builder =>
      import GraphDSL.Implicits._

      val part = builder.add(Partition[Message](2, pingPartition))
      val merge = builder.add(Merge[Message](2))
      val transf = builder.add(Flow[Message].map(pingTransformer))
      // output of first partition then transform then merge to ws output ?
      part.out(1) ~> transf ~> merge.in(1)

      BidiShape(part.in, part.out(0), merge.in(0), merge.out)
    }
  }

  // goes to ws
  val source = Source(commands).concatMat(Source.maybe)(Keep.both)
  // receives from ws
  val sink: Sink[Message, Future[Done]] =
    Flow[Message]
      .mapAsync(4) {
        case message: TextMessage.Strict =>
          println("Strict")
          println(s"${now()} - ${message.text}")
          Future.successful(Done)
        case message: TextMessage.Streamed =>
          println("Streamed")
          message.textStream.runForeach(x => s"${now()} - $x")
        case message: BinaryMessage =>
          message.dataStream.runWith(Sink.ignore)
      }
      .toMat(Sink.last)(Keep.right)

  val flow: Flow[Message, Message, Future[WebSocketUpgradeResponse]] =
    Http().webSocketClientFlow(WebSocketRequest(endpoint))

  val ((completionPromise, upgradeResponse), closed) = source
  // get the uni-direction flow and put a shortcut when a message matches
  // a condition which leads to a partition of the stream
  // those messages from partition 1 will be sent back to ws
    .viaMat(flow.join(pingShortcut))(Keep.both)
    .toMat(sink)(Keep.both)
    .run()

  val connected = upgradeResponse.flatMap { upgrade =>
    if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
      Future.successful(Done)
    } else {
      throw new RuntimeException(
        s"Connection failed: ${upgrade.response.status}"
      )
    }
  }
  connected.onComplete(println)
  closed.foreach(_ => {
    println("closed")
    system.terminate
  })

}

So far I understood like that:

First I open the websocket flow, websockets can send and receive data so does this flow.
A source would be the data I want to send through this flow (in this case just a static list, not a stream) to the websocket server and a sink would be the destination of incoming data (which can goes down to more processing, and can back-pressure)

So since I need to invert the flow if the message is a PING I create this BidiFlow which from it’s graph I can split the stream into a number of partitions and give them integer ids (1 and 0) the stream will pass through the partition function which will then (based on pattern matching on my case) split the current item to the partition based on the id returned by the function, so 1 will be my PING partition and 0 the default one.

Then using Merge I connect the output of partition 1 to the source of the websocket and the default as sink of websocket

Something like that ?

And BTW, Thank you very very much!!

Yapp! I used to draw ascii arts for fellow developers if the usecase/graph is not understandable easily. (BTW we always draw our graphs at least once to a whiteboard/paper if it containts nonbasic (basics: sink, source, flow) elements.) For ex this.

+-------------+            +----------------------------------+             +--------------+
|             |            |        PingShortcut              |             |              |
|             |            |                   +---------+    |             |   Source     |
|             |            |                   |         <------------------+              |
|             <--------------------------------+ Merge   |    |             |              |
|             |            |                   |         <-+  |             +--------------+
|             |            |                   +---------+ |  |
|             |            |               +---------+     |  |
|  Websocket  |            |               |         |     |  |
|             |            |               |  Map()  +-----+  |
|             |            |  +--------+--->         |        |             +--------------+
|             |            |  |        |   +---------+        |             |              |
|             +---------------> Parti  +--------------------> +-----------> |    Sink      |
|             |            |  | tion   |                      |             |              |
|             |            |  +--------+                      |             |              |
+-------------+            +----------------------------------+             +--------------+

Made with http://asciiflow.com/

2 Likes

Nicee, I’ll try that tool for further drawing my flows!
Thank you very very much!! <3