How to detect that a consumer has disconnected from a stream

Given the following

    val ((queue, ks), source) =
        .queue[MyThing](queueSize, OverflowStrategy.dropHead)

How can I detect when the consumer of the source has disconnected so that I can clean up any resources that were created to serve this stream?

I tried adding

stream.watchTermination() { (_, done) =>
      done.onComplete { _ =>
        // cleanup

but it is never invoked

Actually - cancel that. When I use watchTermination correctly, it does work. :doh:

1 Like