Hi guys,
I need to check if my stream is running or not, need this for make a health check for Kubernetes.
Other question, Supervision.Stop, not kill main process, how to make this?
This part of my code…
lazy val decider: Supervision.Decider = {
case e: JsResultException =>
exceptionHandling(e)
Supervision.Resumecase e: JsonEOFException => exceptionHandling(e) Supervision.Resume case e: Exception => exceptionHandling(e) Supervision.Resume case _ => log.info("Fatal error."); Supervision.Stop
}
…
val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
lazy val source: Outlet[ConsumerRecord[String, String]] = builder.add { Consumer plainSource(consumerSettings(), Subscriptions.topics(conf.getString("app.kafka.source-topic"))) }.out lazy val jsonParser: FlowShape[ConsumerRecord[String, String], JsObject] = builder.add { Flow[ConsumerRecord[String, String]].async map { m => Json.parse(m.value).asInstanceOf[JsObject] } } lazy val applyRules: FlowShape[JsObject, JsObject] = builder.add { Flow[JsObject].async map BusinessRules.apply } lazy val jsonStringify: FlowShape[JsObject, String] = builder.add { Flow[JsObject].async map Json.stringify } lazy val producerRecord: FlowShape[String, ProducerRecord[String, String]] = builder.add { Flow[String].async map { m => new ProducerRecord[String, String](conf.getString("app.kafka.sink-topic"), m) } } lazy val sink: Inlet[ProducerRecord[String, String]] = builder.add { Producer plainSink producerSettings }.in source ~> jsonParser ~> applyRules ~> jsonStringify ~> producerRecord ~> sink ClosedShape }).run()