Akka web socket client not consuming message from Lagom stream

Hi,
I am having trouble to consume messages from Lagom stream in akka websocket client.
First, what is working fine:

There is a Lagom 1.6.4 service with the following method:

   override def tick( interval: Int ): ServiceCall[String, Source[String, NotUsed]] = {
      ServiceCall { msg =>
         Future.successful {
            println(s"Received message: ${msg}: ${interval}")
            Source.tick(
                  interval.milliseconds,
                  interval.milliseconds,
                  s"Hello, $msg!"
               )
               .mapMaterializedValue(_ => NotUsed)
         }
      }
   }

When I try to connect and consume messages from this method via https://www.websocket.org/echo.html, it works fine:

But when I try to create an akka websocket client, the incoming messages from Lagom service are not printed out.

I have tried both SingleWebSocketRequest or WebSocketClientFlow from Client-Side WebSocket Support • Akka HTTP. The only change to the example I have made was change the url of the service to “ws://localhost:9001/tick/1000” and added printing of other message types println("other message type received").

object WebSocketClientFlow {
  def main(args: Array[String]): Unit = {
    implicit val system = ActorSystem()
    import system.dispatcher

    // Future[Done] is the materialized value of Sink.foreach,
    // emitted when the stream completes
    val incoming: Sink[Message, Future[Done]] =
      Sink.foreach[Message] {
        case message: TextMessage.Strict =>
          println("Received: " + message.text)
        case _ =>
          println("other message type received")
      }

    // send this as a message over the WebSocket
    val outgoing = Source.single(TextMessage("hello world!"))

    // flow to use (note: not re-usable!)
    //val webSocketFlow = Http().webSocketClientFlow(WebSocketRequest("ws://echo.websocket.org"))
    val webSocketFlow = Http().webSocketClientFlow(WebSocketRequest("ws://localhost:9001/tick/1000"))

    // the materialized value is a tuple with
    // upgradeResponse is a Future[WebSocketUpgradeResponse] that
    // completes or fails when the connection succeeds or fails
    // and closed is a Future[Done] with the stream completion from the incoming sink
    val (upgradeResponse, closed) =
      outgoing
        .viaMat(webSocketFlow)(Keep.right) // keep the materialized Future[WebSocketUpgradeResponse]
        .toMat(incoming)(Keep.both) // also keep the Future[Done]
        .run()

      // just like a regular http request we can access response status which is available via upgrade.response.status
      // status code 101 (Switching Protocols) indicates that server support WebSockets
    val connected = upgradeResponse.flatMap { upgrade =>
      if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
        Future.successful(Done)
      } else {
        throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
      }
    }

    // in a real application you would not side effect here
    connected.onComplete(println)
    closed.foreach(_ => println("closed"))
  }
}

The only output from the client which is printed out is:

Success(Done)
closed

and no incoming message. The service log displays the Received message: hello world!: 1000 text, so message sent by client to Lagom service was received properly. I suppose that the service responded properly too, but client did not print any incoming message.

What is interesting too is that when the same client tries to send a message to ws://echo.websocket.org, it works fine and the received message from the server is printed out properly.

Obviously: the client works, the service works, but they do not work together. What could be the problem here?

Thank you for any hints.
Ales

Maybe the the service method should like this:

 override def tick( interval: Int ): ServiceCall[String, Source[Message, NotUsed]] = {
      ServiceCall { msg =>
         Future.successful {
            println(s"Received message: ${msg}: ${interval}")
            Source.tick(
                  interval.milliseconds,
                  interval.milliseconds,
                  TextMessage(s"Hello, $msg!")
               )
               .mapMaterializedValue(_ => NotUsed)
         }
      }
   }

but then this error is raised:
could not find implicit value for parameter responseSerializer: com.lightbend.lagom.scaladsl.api.deser.MessageSerializer[akka.stream.scaladsl.Source[akka.http.scaladsl.model.ws.Message,akka.NotUsed], _].

I have tried to provide MessageSerializer[Source[Message], _], but was not able to find implicit serializer and I do not know how to create one.