Akka Typed with Akka HTTP WS

Hello there,

I am running into an error while I am trying to setup akka http websocket (server side of websocket) using Akka Typed.

import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ActorRef, Behavior}
import akka.http.scaladsl.model.ws.{Message, TextMessage}
import org.json4s.JsonAST.JValue
import org.json4s.jackson.JsonMethods._

import scala.util.{Failure, Success, Try}

//Akka Typed code...
object WSJSONMessageService {
  sealed trait Command
  case class IncomingMessage(v: Message) extends Command
  case class IncomingJSONMessage(v: JValue) extends Command
  case class OutgoingJSONMessage(v: JValue) extends Command
  case class OutgoingActorRef(ref: ActorRef[TextMessage]) extends Command
  case class ReportError(err: Throwable) extends Command
  case object Stop extends Command

  def apply(): Behavior[Command] = handleMessages(null)

  private def handleMessages(outgoingHandler: ActorRef[TextMessage]): Behavior[Command] = Behaviors.setup { ctx =>
    Behaviors.receiveMessage {
      case IncomingJSONMessage(v) =>
        ctx.log.info(s"Incoming message: ${compact(v)}")
        //todo: give this message to some useful actor.
        ctx.self ! OutgoingJSONMessage(v) //hack for echo message!
        Behaviors.same

      case IncomingMessage(TextMessage.Strict(text)) =>
        Try(parse(text)) match {
          case Success(input) => ctx.self ! WSJSONMessageService.IncomingJSONMessage(input)
          case Failure(exception) => ctx.log.error(exception, s"Error while processing: '$text'")
        }
        Behaviors.same

      case IncomingMessage(_) =>
        Behaviors.same

      case OutgoingJSONMessage(v) =>
        ctx.log.info(s"Outgoing message: ${compact(v)}")
        if (outgoingHandler == null) {
          ctx.log.warning(s"Outgoing message handler not set. Skipping: ${compact(v)}")
        }
        outgoingHandler ! TextMessage(compact(v))
        Behaviors.same

      case OutgoingActorRef(ref) =>
        ctx.log.info(s"Registration of outgoing handler")
        handleMessages(ref)

      case Stop =>
        ctx.log.info("Stopping actor!")
        Behaviors.stopped

      case ReportError(th) =>
        ctx.log.error(th, "Error occurred while receiving message from WS")
        Behaviors.stopped
    }
  }
}

//WS code to return a Flow...
private def wsJSONMessageHandler()(implicit context: ActorContext, mat: ActorMaterializer): Flow[Message, Message, Any] = {
    import akka.actor.typed.scaladsl.adapter._
    val actorRef = context.spawnAnonymous(WSJSONMessageService())

    import akka.stream.typed.scaladsl._
    val incoming = Flow[Message].mapAsync(1)(_.removeStream(5 minutes))
      .collect{case v: Message => WSJSONMessageService.IncomingMessage(v)}
      .to(ActorSink.actorRef[WSJSONMessageService.Command](actorRef, WSJSONMessageService.Stop, WSJSONMessageService.ReportError))

    //This outgoing ActorSource is error-ing out to the declared RuntimeException.
    /*val outgoing = ActorSource.actorRef[Message]({ case v =>}, { case input =>
      new RuntimeException(s"error in ${input}")
    }, 10, OverflowStrategy.dropHead)
      .mapMaterializedValue(v => actorRef ! WSJSONMessageService.OutgoingActorRef(v))*/

    val outgoing = Source.actorRef[Message](10, OverflowStrategy.dropHead)
      .mapMaterializedValue(v => actorRef ! WSJSONMessageService.OutgoingActorRef(v))

    Flow.fromSinkAndSource(incoming, outgoing)
  }

  implicit class RichMessage(value: Message) {
    def removeStream(duration: FiniteDuration)(implicit mat: Materializer):Future[Any] = {
      value match {
        case v:TextMessage => v.toStrict(duration)
        case v:BinaryMessage => v.toStrict(duration)
        case _ => Future.successful(NotUsed)
      }
    }
  }

I am successfully able to create a Http route for the above code using handleWebSocketMessages directive. The incoming Sink already uses Akka Typed and works fine.
The outgoing Source does not seem to work (handle to new RuntimeException(s"error in ${input}") is called) with ActorSource (shown above commented). A similar Source from untyped works as expected (shown above).

Please advice on what I might be missing.

Thanks,
Muthu

I was able to resolve it by changing the commented code above to the following…

val outgoing = ActorSource.actorRef[Message]({ case TextMessage.Strict("close") => }, { case null =>
      new RuntimeException(s"Null cannot be sent}")
    }, 10, OverflowStrategy.dropHead)
      .mapMaterializedValue(v => actorRef ! WSJSONMessageService.OutgoingActorRef(v))

The problem was that, I didn’t pay attention to the documentation :grin: that stated the params of actorRef to be matchers and hence it was failing for me. Works as intended.
I still don’t know how to
a. Do something similar to above for regular HTTP (that uses route + complete on Future)?
b. For the pattern of one-actor per WS connection model, how do I find a way to pass typed context around. Is it recommended to pass context around? Or perhaps have an actor that owns the factory to create and manage the lifecycle of this actor? If so, how would I be able to create one on demand asynchronously using Akka Typed?

Thank you for your attention and please feel free to comment any tweaks I could make to the above code.

Please advice,
Muthu

More update / questions…
The changes to akka http ws works. But, I am unable to find a good way to test it.
Since, the ws part of the system is backed by Akka Typed, I could use ScalaTestWithActorTestKit for testing. But, ScalatestRouteTest uses Untyped Actor. How would I be able to use them together?

Please advice,
Muthu

For lack of better understanding, I created the following trait…

trait ScalatestTypedActorHttpRoute extends ScalatestRouteTest { this: Suite =>
  var typedTestKit:ActorTestKit = _ //val init causes createActorSystem() to cause NPE when typedTestKit.system is called.
  implicit def timeout: Timeout = typedTestKit.timeout
  implicit def scheduler: Scheduler = typedTestKit.scheduler

  protected override def createActorSystem(): ActorSystem = {
    typedTestKit = ActorTestKit(ActorTestKitBase.testNameFromCallStack())
    typedTestKit.system.toUntyped
  }

  override def cleanUp(): Unit = {
    super.cleanUp()
    typedTestKit.shutdownTestKit()
  }
}

with this, I am able to…

class WSInvalidInputSpec extends FlatSpec with Matchers with ScalatestTypedActorHttpRoute {
  implicit val mat: ActorMaterializer = ActorMaterializer()
  implicit val timeout: Timeout = Timeout(5 minutes)
  implicit val scheduler: Scheduler = system.scheduler
  val wsClient = WSProbe()

  val tokenService = typedTestKit.spawn(MyTypedActor("favParam"))

  "Invalid request to WS" should "be handled correctly" in {
    WS("/ws", wsClient.flow) ~> MyRoutes.mySimpleWSRoute ~>
     check {
       isWebSocketUpgrade shouldEqual true

       wsClient.sendMessage("test")
       (toJSON(wsClient.expectMessage()) \ "message-type").shouldBe(JString("BadRequestMessage"))

       wsClient.sendMessage("{\"abc\": \"cde\"}")
       (toJSON(wsClient.expectMessage()) \ "message-type").shouldBe(JString("UnsupportedRequestMessage"))

       wsClient.sendCompletion()
     }
  }

The trait code doesn’t seem be clean. But works in my simple test. Any help is appreciated.

Thanks,
Muthu