Stop a dynamic stream in Lagom method

Hi,

Is it posible to detect clossing of web connection inside a Lagom service method ?

Bellow is the signature of the method

def streamQl(): ServiceCall[Source[QueryAst, NotUsed], Source[JsValue, NotUsed]]

In this method I start Akka stream, but dynamic one

val consumer = BroadcastHub.sink[JsValue](bufferSize = 16)

val runnableGraph =
  MergeHub.source[JsValue](perProducerBufferSize = 16)
    .viaMat(KillSwitches.single)(Keep.both)
    .toMat(consumer)(Keep.both)

val ((sink, killSwitch), source) = runnableGraph
  .run()

so it works constantly. The source from the tuple is returned to client.

I want to shutdown the stream after a web connection is closed.

Zlaja

I did it with alsoTo operator on incoming source.

incomingSource
      .alsoTo(Sink.ignore.mapMaterializedValue{ mat =>
        mat.onComplete{ _ =>
          log.debug("StreamQl alsoTo materialized value completed")
          killSwitchOpt.foreach(_.shutdown())
        }
        mat
      })