How to check if the Stream is running or not?

Hi guys,

I need to check if my stream is running or not, need this for make a health check for Kubernetes.

Other question, Supervision.Stop, not kill main process, how to make this?

This part of my code…

lazy val decider: Supervision.Decider = {
case e: JsResultException =>
exceptionHandling(e)
Supervision.Resume

case e: JsonEOFException =>
  exceptionHandling(e)
  Supervision.Resume

case e: Exception =>
  exceptionHandling(e)
  Supervision.Resume

case _ => log.info("Fatal error."); Supervision.Stop

}

val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>

  lazy val source: Outlet[ConsumerRecord[String, String]] = builder.add {
    Consumer plainSource(consumerSettings(), Subscriptions.topics(conf.getString("app.kafka.source-topic")))
  }.out

  lazy val jsonParser: FlowShape[ConsumerRecord[String, String], JsObject] = builder.add {
    Flow[ConsumerRecord[String, String]].async map { m => Json.parse(m.value).asInstanceOf[JsObject] }
  }

  lazy val applyRules: FlowShape[JsObject, JsObject] = builder.add {
    Flow[JsObject].async map BusinessRules.apply
  }

  lazy val jsonStringify: FlowShape[JsObject, String] = builder.add {
    Flow[JsObject].async map Json.stringify
  }

  lazy val producerRecord: FlowShape[String, ProducerRecord[String, String]] = builder.add {
    Flow[String].async map { m => new ProducerRecord[String, String](conf.getString("app.kafka.sink-topic"), m) }
  }

  lazy val sink: Inlet[ProducerRecord[String, String]] = builder.add {
    Producer plainSink producerSettings
  }.in

  source ~> jsonParser ~> applyRules ~> jsonStringify ~> producerRecord ~> sink

  ClosedShape
}).run()

Hi. You can Implement your stream as a Stream and Return Consumer.Control object that contains isShutdown: future[Done]. That future will complete with stream.
About complete main Process. If you mean main thread then you can use system.exit future above

I think in Matherializer better match on NonFatal(ex) rather then Exception