Streaming Json from one Play-App to another

I have two microservices running play. Now I need to consume a stream of json from one application (Source-App) in the other (Sink-App)

In the Source-App I have code like this:

val chunked = items
          .map(Json.toJson(_))
          .map(json => ByteString(json.toString()) ++ ByteString("\n"))
        Ok.chunked(chunked).as("application/stream+json")

and in the Sink-App, I’m trying to consume the data like this:

val source: Source[JsObject, NotUsed] = Source
      .fromFuture(
        wSClient
          .url(s"${config.endpoint}/v1/companies/${companyId.id}/userLevels")
          .addHttpHeaders(
            ("Accept", "application/stream+json")
          )
          .get()
          .map(resp => resp.bodyAsSource))
      .flatMapConcat(s => s) 
      .map(bs => bs.decodeString("UTF-8"))
      .map(s => {
        s // here now instead of getting multiple ByteString elements, its just one element already concatenated 
      })
      .map(Json.parse(_)

Is there something wrong in my thinking? It looks like the AhcWsClient is buffering the chunks and concatenating them together. Would akka-http solve this and give me access to the raw stream?

ok… after lots of debugging, I figured out that I should use stream() instead of get() … maybe it helps someone :)

1 Like