Testing Akka HTTP (0.10.1) Host-Level Client-Side API in an actor with ScalaTestWithActorTestKit

I have a simple actor functioning as a proxy to a model serving service.

object ModelService {

  sealed trait Command extends NoSerializationVerificationNeeded

  sealed trait Request[R <: Reply] extends Command {
    def replyTo: ActorRef[R]
  }

  case class GetPrediction(replyTo: ActorRef[Reply], htmlInput: String, probabilityThreshold: Float)
      extends Request[Reply]

  sealed trait Reply extends NoSerializationVerificationNeeded

  case object ModelOffline extends Reply
  case class Prediction(probIndexVec: Vector[(Float, Int)]) extends Reply
  case class ModelError(error: String) extends Reply

//   case class Context(replyTo: ActorRef[Reply])
  def apply(modelServiceHost: String, port: Int, path: String): Behavior[Command] = {

    val QueueSize = 10

    Behaviors.setup { context =>
      implicit val system = context.system.toClassic
      import system.dispatcher // to get an implicit ExecutionContext into scope

      val poolClientFlow =
        Http()(system).cachedHostConnectionPool[ActorRef[Reply]](modelServiceHost, port)

      def createRequest(predictionCommand: GetPrediction): (HttpRequest, ActorRef[Reply]) = ???

      def parseResponse(response: HttpResponse): Either[ModelError, Prediction] = ???

      val queue = Source
        .queue[GetPrediction](QueueSize, OverflowStrategy.dropNew)
        .map(createRequest)
        .via(poolClientFlow)
        .to(Sink.foreach({
          case (Success(resp), replyTo) => parseResponse(resp).fold(replyTo ! _, replyTo ! _)
          case (Failure(e), replyTo)    => replyTo ! ModelError("failed to get a response from the model service")
        }))
        .run()

      Behaviors.receiveMessage {
        case cmd @ GetPrediction(replyTo, htmlInput, probabilityThreshold) =>
          queue.offer(cmd)
          Behaviors.same

      }

    }
  }

}

when I want to test it in this

import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import org.scalatest.wordspec.AnyWordSpecLike
import org.scalatest.BeforeAndAfterAll
import org.scalatest.matchers.should.Matchers
import com.typesafe.config.ConfigFactory

class ModelServiceSpec extends ScalaTestWithActorTestKit() with AnyWordSpecLike with BeforeAndAfterAll with Matchers {

  override def afterAll(): Unit = testKit.shutdownTestKit()

  "the model service" when {

    val modelService = testKit.spawn(ModelService("127.0.0.1", 8080, "/model"), "model-service")
    val probe = testKit.createTestProbe[ModelService.Reply]()

    "a valid request" should {

      "get response from the model-serving server" in {
        modelService ! ModelService.GetPrediction(probe.ref, "this is a test scala question", 0.7f)
        probe.expectMessage(ModelService.ModelError("still testing"))
      }

    }

  }

}

I saw

[2020-11-20 22:08:17,219] [DEBUG] [akka.remote.artery.Decoder] [] [ModelServiceSpec-akka.actor.default-dispatcher-5] - Decoded message but unable to record hits for compression as no remoteAddress known. No association yet? {akkaAddress=akka://ModelServiceSpec@127.0.0.1:2551, sourceThread=ModelServiceSpec-akka.remote.default-remote-dispatcher-11, akkaSource=Decoder(akka://ModelServiceSpec), sourceActorSystem=ModelServiceSpec, akkaTimestamp=21:08:17.218UTC}
[2020-11-20 22:08:17,219] [WARN] [akka.remote.artery.InboundHandshake$$anon$2] [] [ModelServiceSpec-akka.actor.default-dispatcher-5] - Dropping Handshake Request from [akka://ModelServiceSpec@127.0.0.1:2551#1138052464865087236] addressed to unknown local address [akka://nt-ui@127.0.0.1:2551]. Local address is [akka://ModelServiceSpec@127.0.0.1:2551]. Check that the sending system uses the same address to contact recipient system as defined in the 'akka.remote.artery.canonical.hostname' of the recipient system. The name of the ActorSystem must also match. {akkaAddress=akka://ModelServiceSpec@127.0.0.1:2551, sourceThread=ModelServiceSpec-akka.actor.internal-dispatcher-4, akkaSource=InboundHandshake$$anon$2(akka://ModelServiceSpec), sourceActorSystem=ModelServiceSpec, akkaTimestamp=21:08:17.218UTC}

it seems to me that the prob is using address akka://ModelServiceSpec@127.0.0.1:2551 where the real actor is using akka://nt-ui@127.0.0.1:2551.
“nt-ui” is my project’s name.

can someone explain the meaning of “Check that the sending system uses the same address to contact recipient system as defined in the ‘akka.remote.artery.canonical.hostname’ of the recipient system. The name of the ActorSystem must also match.”

thanks!

It seems to be fixed with

class ModelServiceSpec
    extends ScalaTestWithActorTestKit(ConfigFactory.parseString("""
    akka.remote.artery.canonical.hostname = "nt-ui"
    """))

I would still appreciate some explanation.

now the logs show that the queue stream is never run.
what am I doing wrong?

actually, logging with context.log.info inside createRequest doesn’t work. when I replace with println the stream does run.
the issue is .via(poolClientFlow). but there are no logs from it after queue.offer(cmd) has run.

I think I’ve found the real issue.

In

  def createRequest(predictionCommand: GetPrediction): (HttpRequest, ActorRef[Reply]) = {

        val entity: RequestEntity = HttpEntity.apply(
          MediaTypes.`application/json`,
          s"""
            {
                "html_text": ${predictionCommand.htmlInput},
                "threashold": ${predictionCommand.probabilityThreshold}
            }
        """.stripMargin)

        println(s"HTTP entity is $entity")

        val request = HttpRequest(HttpMethods.POST, Uri(s"$modelServiceHost:$port$path"), entity = entity)

        println(s"HTTP request is $request")

        request -> predictionCommand.replyTo

      }

the first println is run while the second isn’t.
This behavior is so unexpected. what can cause it?

If I run (almost) the same thing in am app, it works.
As in

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import scala.util.{ Failure, Success }
import akka.stream.scaladsl.Source
import akka.stream.OverflowStrategy
import akka.stream.scaladsl.Sink

object HttpClientSingleRequest {

  sealed trait Command
  case class GetPrediction(htmlInput: String, probabilityThreshold: Float) extends Command

  implicit val system = ActorSystem("nt-ui")

  val poolClientFlow =
    Http()(system).cachedHostConnectionPool[Unit]("127.0.0.1", 8080)

  def createRequest(predictionCommand: GetPrediction): (HttpRequest, Unit) = {

    println(s"createRequest called")

    val entity: RequestEntity = HttpEntity.apply(
      MediaTypes.`application/json`,
      s"""
            {
                "html_text": "${predictionCommand.htmlInput}",
                "threashold": ${predictionCommand.probabilityThreshold}
            }
        """.stripMargin)

    //TODO delete me
    println(s"HTTP entity is $entity")

    val request = HttpRequest(HttpMethods.POST, Uri(s"http://127.0.0.1:8080/model"), entity = entity)

    //TODO delete me
    println(s"HTTP request is $request")

    request -> ()

  }

  def main(args: Array[String]): Unit = {

    val queue = Source
      .queue[GetPrediction](10, OverflowStrategy.dropNew)
      .map(a => { println(a); a })
      .map(createRequest)
      .via(poolClientFlow)
      .to(Sink.foreach({
        case (Success(resp), replyTo) => println(s"recieved response $resp")
        case (Failure(e), replyTo)    => sys.error("something wrong")
      }))
      .run()

    queue.offer(GetPrediction("this is a test scala question", 0.7f))

  }
}

I don’t understand why it doesn’t work in the actor test setting

I figured out.
There was a bug in the URI string interpolation.
It should be Uri(s"http://$modelServiceHost:$port$path") instead of Uri(s"$modelServiceHost:$port$path"),

The confusing part is that this user bug causes println not to run.
I think it is related to the URI validation logic. I would expect either an error log message or an exception. but it seems that it just hangs there.

I forgot that a stream started in an actor doesn’t run on the same thread of the actor. Or, I should say its running context is not managed by the actor. So, it doesn’t make sense to access context.log of the actor to log in the stream.

Hi @zhenhao,

most likely the problem is that Uri.apply will throw an exception that will end up somewhere downstream but you don’t log it properly.

In the example you shared it would probably abort the stream. You need to keep the materialized value of Sink.foreach to get access to a Future[Done] that will be completed with a Failure which will contain the stack trace.

You can try changing the code to

val (queue, resultFut) = 
  // ...
  // .toMat(Sink,foreach{...})(Keep.both).run()

resultFut.onComplete {
  case Success(_) => // stream completed successfully
  case Failure(ex) => // stream failed with an error, here you should be able to see the exception
}
1 Like

I think you are right. But it is not intuitive how to set up logging in Akka stream. I need to do more reading.