Delete a file after its streaming is completed

As a follow-up from my first post, I now want to delete the files listed by Directory.ls for which the streaming has been completed. How is that achievable?

If you use the materialized values correctly, you can use its returning future.

For ex:

val ret: Future[String] = Source.single("data").runWith(Sink.last)
ret.onComplete(_ => deleteFile)

Docs: https://doc.akka.io/docs/akka/current/stream/stream-composition.html#materialized-values

What if the overall pipeline is more complicated though, i.e. something like:

Source(Seq("test1", "test2", "test3").toList)
  .map(ByteString(_))
  .via(???)
  .via(???)
  .runWith(Sink.ignore)

Is there a way to access the Future[String] corresponding to the streamed value (here “test1”, “test2”, “test3”) as soon as map(ByteString(_)) completes for each one respectively?

Something like this?

Source(Seq("test1", "test2", "test3").toList)
  .map(ByteString(_))
  .watchTermination
  .viaMat(???)(Keep.left)
  .viaMat(???)(Keep.left)
  .toMat(Sink.ignore)(Keep.left).run