How to shutdown a stream prefixed with a MergeHub.source without loosing elements


(Heiko Seeberger) #1

MergeHub.source takes a bufferSize argument which cannot be zero. When using a kill switch like shown below, shutting down results in loosing the elements buffered in the source.

Are there other ways to shutdown without loosing the buffered elements than the one shown below? If not, do you think it’s possible to change the implementation of MergeHub to allow for bufferSize zero?

MergeHub
  .source[Foo](bufferSize)
  .viaMat(KillSwitches.single)(Keep.both)
  .via(processFoos)
  .runWith(Sink.ignore)