Working with eof for a Source/Flow

I have come across an interesting usecase in regards to stream boundaries and detection of eof. In context I am dealing with is using an asynchronous JSON parser that is specifically designed to lazily parse elements of a JSON stream. I am using https://github.com/mdedetrich/akka-streams-json to do this but it solves the same problem as Alpakka’s own JSON framing library at JSON • Alpakka Documentation (it just uses Jawn/Circe underneath for the parsing/serializing).

Akka-streams-json uses jawn under the hood which also supports asynchronous parsing (see https://github.com/typelevel/jawn#parsing). In my case I am dealing with streams of JSON that contains a massive JSON array so if you want to parse each element into the JSON array using akka-streams-json you would use AsyncParser.UnwrapArray i.e.

val endingNullArrayJson = """[1,2,3,null]"""

"AsyncParser" should {
    val entity = Source.single(ByteString(endingNullArrayJson))
    "produce all values" in {
        val parsed = entity.via(decode[Option[Int]](AsyncParser.UnwrapArray)).runWith(Sink.seq)

        parsed.map {
            _ shouldBe Seq(Some(1),
                           Some(2),
                           Some(3),
                           None
            )
        }
    }
}

This code works as expected however lets say we are dealing with a Source that contains multiple JSON arrays and the Source correctly emits a single JSON array every time, i.e.

val endingNullArrayJson = """[1,2,3,null]"""

"AsyncParser" should {
    val entity = Source(List.fill(3)(ByteString(endingNullArrayJson)))
    "produce all values" in {
        val parsed = entity.via(decode[Option[Int]](AsyncParser.UnwrapArray)).runWith(Sink.seq)

        parsed.map {
            _ shouldBe Seq(Some(1),
                           Some(2),
                           Some(3),
                           None,
                           Some(1),
                           Some(2),
                           Some(3),
                           None,
                           Some(1),
                           Some(2),
                           Some(3),
                           None
            )
        }
    }
}

This then fails to correctly run, producing the following error

expected eof got '[1,2,3...' (line 1, column 13)
org.typelevel.jawn.ParseException: expected eof got '[1,2,3...' (line 1, column 13)
	at org.typelevel.jawn.Parser.die(Parser.scala:131)
	at org.typelevel.jawn.Parser.die(Parser.scala:89)
	at org.typelevel.jawn.AsyncParser.churn(AsyncParser.scala:191)
	at org.typelevel.jawn.AsyncParser.absorb(AsyncParser.scala:98)
	at org.mdedetrich.akka.json.stream.JsonStreamParser$ParserLogic.emitOrPullLoop(JsonStreamParser.scala:107)
	at org.mdedetrich.akka.json.stream.JsonStreamParser$ParserLogic.org$mdedetrich$akka$json$stream$JsonStreamParser$ParserLogic$$upstreamPush(JsonStreamParser.scala:88)
	at org.mdedetrich.akka.json.stream.JsonStreamParser$ParserLogic$$anon$2.onPush(JsonStreamParser.scala:80)

Which points to here jawn/AsyncParser.scala at 8d802a67cd0e7fe45284d514597eac5d7ae78034 · typelevel/jawn · GitHub

Herein lies the core issue which is that when you use AsyncParser.UnwrapArray with Jawn it strictly expects there to only be a single JSON Array per bytestream and so when you reach the end of the JSON array (i.e. just after ]) it expects you hit the EOF case using atEof ( jawn/AsyncParser.scala at 8d802a67cd0e7fe45284d514597eac5d7ae78034 · typelevel/jawn · GitHub). In other words the bytestream actually needs to be finished to trigger the EOF case which is done by detecting if the current cursor index ends up being greater than the the current length of bytes. What is happening above instead is that a [ (which is the start of the second JSON Array) is being sent after the ].

Note that if you parse this example without using the Framing/lazy parsing then it works fine i.e.

val endingNullArrayJson = """[1,2,3,null]"""

"AsyncParser" should {
    val entity = Source(List.fill(3)(ByteString(endingNullArrayJson)))
    "produce all values" in {
        val parsed = entity.via(decode[List[Option[Int]]]).runWith(Sink.seq)

        parsed.map {
            _ shouldBe Seq(Some(1),
                           Some(2),
                           Some(3),
                           None,
                           Some(1),
                           Some(2),
                           Some(3),
                           None,
                           Some(1),
                           Some(2),
                           Some(3),
                           None
            )
        }
    }
}

So my question here is about how to handle this and what the expectations are? One can argue that Jawn is being “correct” here because when you using AsyncParser.UnwrapArray it is really expecting one single valid JSON element per stream and so the only way that Jawn knows that the byte array is finished is by eof. The annoying thing about this reasoning is the usability implications for this. For example the above example is a minimization of a real usecase where I want to parse large S3 objects each containing a JSON array one after another. The initial implementation looked like this

def restore: Future[Done] =
  Source
    .future(finalKeys)
    .flatMapConcat(Source.apply)
    .via(downloadFlow)
    .via(CirceStreamSupport.decode[Option[ReducedConsumerRecord]](AsyncParser.UnwrapArray))
    .collect {
    case Some(reducedConsumerRecord) =>
        reducedConsumerRecord
    }
    .runWith(kafkaProducerInterface.getSink)

Nothing terribly fancy here, finalKeys is just a Future[List[String]] which is a collection of the S3 keys and for each S3 key we want to download it (with downloadFlow) and then parse the JSON. This however doesn’t work because of the previously mentioned problem so now I have to do this

object Utils {
  def runSequentially[A](
      lazyFutures: List[() => Future[A]]
  )(implicit ec: ExecutionContext): Future[List[A]] =
    lazyFutures.foldLeft(Future.successful(List.empty[A])) { (acc, curr) =>
      for {
        a <- acc
        c <- curr()
      } yield c :: a
    }
}

def restoreKey(key: String): Future[Done] = Source
  .single(key)
  .via(downloadFlow)
  .via(CirceStreamSupport.decode[Option[ReducedConsumerRecord]](AsyncParser.UnwrapArray))
  .collect {
    case Some(reducedConsumerRecord) =>
      reducedConsumerRecord
  }
  .runWith(kafkaProducerInterface.getSink)

def restore: Future[Done] = {
  implicit val ec: ExecutionContext = system.dispatcher

  for {
    keys <- finalKeys
    _    <- Utils.runSequentially(keys.map(key => () => restoreKey(key)))
  } yield Done
}

Which is undesirable since I would ideally want to treat this as an entire single flow (at least to me this looks like a workaround because you shouldn’t need to materialize the stream multiple times).

Another solution would be to somehow intersperse the ByteString Source directly with an eof so that Jawn’s atEof gets triggered however I don’t think this is possible right now? Or is the better solution for this is to handle the case explicitly in akka-streams-json as an edge case for AsyncParser.UnwrapArray in akka-streams-json/JsonStreamParser.scala at master · mdedetrich/akka-streams-json · GitHub i.e. manually terminate the Stream in the GraphStageLogic?

Finally I guess it would be ideal to mimic the behavior of what Alpakka JSON framing (JSON • Alpakka Documentation) does for consistency reasons. Also if there are any code examples of dealing with the same issue that would be greatly appreciated!

So an update on this, I checked how Alpakka JSON behaves in this scenario and I realized that it actually supports this use case (i.e. it will parse multiple top level JSON array’s without any issues). I wrote a simple test for this behavior at Add json streaming test for multiple top level array's by mdedetrich · Pull Request #2830 · akka/alpakka · GitHub.

For the sakes of consistency I will replicate the same behaviour in akka-streams-json.

So after some investigation this ended up being a Jawn issue. There isn’t really a concept of EOF when it comes to continuous streams and trying to fix this in akka-streams-circe either isn’t possible or is not suitable because it would involve peeking ahead in a read only ByteBuffer to see if you hit the [ of an additional JSON Array.

I have instead created a PR in Jawn itself to handle this usecase Support multiple top level JSON arrays for UnwrapArray by mdedetrich · Pull Request #438 · typelevel/jawn · GitHub