Alpakka JSON Reading

Hi all,

I was looking around JSON reading & came across Alpakka. I thought I would give reading some JSON & printing it a go but nothing is printed in the console. The JSON is just a simple JSON string in the code.

import akka.actor.ActorSystem
import akka.stream.ClosedShape
import akka.stream.alpakka.json.scaladsl.JsonReader
import akka.stream.scaladsl.{Broadcast, GraphDSL, JsonFraming, Keep, RunnableGraph, Sink, Source}
import akka.util.ByteString

import scala.concurrent.ExecutionContext
import scala.util.{Failure, Success}

object Main extends App {
  implicit val actorSystem: ActorSystem = ActorSystem("custom-system")
  implicit val executionContext: ExecutionContext = actorSystem.getDispatcher

  val data =
    """
      |[
      |  {
      |    "id": 1,
      |    "name": "Bulbasaur",
      |    "type": [
      |      "Grass",
      |      "Poison"
      |    ],
      |    "HP": 45,
      |    "Attack": 49,
      |    "Defense": 49,
      |    "Sp. Attack": 65,
      |    "Sp. Defense": 65,
      |    "Speed": 45
      |  },
      |  {
      |    "id": 2,
      |    "name": "Ivysaur",
      |    "type": [
      |      "Grass",
      |      "Poison"
      |    ],
      |    "HP": 60,
      |    "Attack": 62,
      |    "Defense": 63,
      |    "Sp. Attack": 80,
      |    "Sp. Defense": 80,
      |    "Speed": 60
      |  }
      |]
      |""".stripMargin

  Source.single(ByteString.fromString(data))
    .via(JsonReader.select("$.rows[*].name"))
    .via(JsonFraming.objectScanner(100))
    .map(_.utf8String)
    .runForeach(x => println(x))
    .map { _ =>
      println("Finished !")
      actorSystem.terminate()
    }
    .recover {
      case e: Exception => println(s"Failed ! ${e.getMessage}")
        actorSystem.terminate()
    }
}

The program ends with Finished ! & everything closes but nothing is printed to console, but I can’t see why to be honest.

So I thought I would try a different approach:

val source = Source.single(data)

val printSink = Sink.foreach[String](println)

val transformPrintSink = JsonReader.select("$.rows[*].name")
  .map(byteString => byteString.utf8String)
  .toMat(printSink)(Keep.right)

val graph = GraphDSL.createGraph(transformPrintSink) { implicit builder =>
  (console) =>
    import GraphDSL.Implicits._

    val broadCast = builder.add(Broadcast[String](1))

    source ~> broadCast ~> console
    ClosedShape
}

val materialized = RunnableGraph.fromGraph(graph).run()

materialized.onComplete {
  case Success(_) =>
    actorSystem.terminate()
  case Failure(e) =>
    println(s"Failure: ${e.getMessage}")
    actorSystem.terminate()
}

Now if I use the GraphDSL.createGraph(printSink) then the entirety of the JSON is printed.
However using transformPrintSink I get a compilation error:

overloaded method ~> with alternatives:
  (to: akka.stream.SinkShape[String])(implicit b: akka.stream.scaladsl.GraphDSL.Builder[_]): Unit <and>
  (to: akka.stream.Graph[akka.stream.SinkShape[String], _])(implicit b: akka.stream.scaladsl.GraphDSL.Builder[_]): Unit <and>
  [Out](flow: akka.stream.FlowShape[String,Out])(implicit b: akka.stream.scaladsl.GraphDSL.Builder[_]): akka.stream.scaladsl.GraphDSL.Implicits.PortOps[Out] <and>
  [Out](junction: akka.stream.UniformFanOutShape[String,Out])(implicit b: akka.stream.scaladsl.GraphDSL.Builder[_]): akka.stream.scaladsl.GraphDSL.Implicits.PortOps[Out] <and>
  [Out](junction: akka.stream.UniformFanInShape[String,Out])(implicit b: akka.stream.scaladsl.GraphDSL.Builder[_]): akka.stream.scaladsl.GraphDSL.Implicits.PortOps[Out] <and>
  [Out](via: akka.stream.Graph[akka.stream.FlowShape[String,Out],Any])(implicit b: akka.stream.scaladsl.GraphDSL.Builder[_]): akka.stream.scaladsl.GraphDSL.Implicits.PortOps[Out] <and>
  [U >: String](to: akka.stream.Inlet[U])(implicit b: akka.stream.scaladsl.GraphDSL.Builder[_]): Unit
 cannot be applied to (akka.stream.SinkShape[akka.util.ByteString])
    source ~> broadCast ~> console

Is this essentially telling me that I can’t connect a Sink that takes in a ByteString as it’s input to the graph because of some reason?

The reason that I wrote it the way that I have the second time around was because I was going to extend it to try & write the result of the transformation to a file as well as printing the transformed JSON to console, but that will be for another time after this.

Is there a reason there was no printing with the first attempt I had made at reading the JSON?

Regards.

The way you currently have it written is that the Source returns a [Future[Future[Terminated]. Therefore, the program ends before your future(s) return.

There’s also a problem with the select, so I just changed it to root ($), and increased the buffer so that it looks like this:

...
  val done: Future[Done] = Source.single(ByteString.fromString(data))
    .via(JsonReader.select("$"))
    .via(JsonFraming.objectScanner(1000))
    .map(_.utf8String)
    .runForeach(x => println(s"element: $x"))

  done
    .map { _ =>
      println("Finished !")
      actorSystem.terminate()
    }
    .recover {
      case e: Exception => println(s"Failed ! ${e.getMessage}")
        actorSystem.terminate()
    }

My output looks like this:

2022-06-22 13:58:27,507 INFO  akka.event.slf4j.Slf4jLogger [] - Slf4jLogger started
element: {"id":1,"name":"Bulbasaur","type":["Grass","Poison"],"HP":45,"Attack":49,"Defense":49,"Sp. Attack":65,"Sp. Defense":65,"Speed":45}
element: {"id":2,"name":"Ivysaur","type":["Grass","Poison"],"HP":60,"Attack":62,"Defense":63,"Sp. Attack":80,"Sp. Defense":80,"Speed":60}
Finished !
2022-06-22 13:58:27,865 INFO  akka.actor.CoordinatedShutdown [custom-system-akka.actor.default-dispatcher-5] - Running CoordinatedShutdown with reason [ActorSystemTerminateReason]

I’ll leave it to you to tune your select.

Best, Michael

1 Like

Thank you Michael !
Yes this works. I see what you mean about the Future[Future[Terminated]. I really appreciate the response!

Kind Regards,
Dips.