We have following stream source:
val (actorRef, stream) = Source.actorRef[(Event, Promise[Done])](1024, OverflowStrategy.dropHead).preMaterialize()
As you can notice above the actor message queue size is 1024 and overflow strategy is DropHead.
So, in the scenario when 1025th
Event is inserted in actor message queue it gets dropped, but the promise associated with that Event is left orphaned in whole stream processing. This promise is never completed and the associated future never completes. We need a way to get the handle of dropped promises so that we can complete them with appropriate failure.
A Potential solution is to introduce a method in
Source similar to
preMaterialize() , for e.g.
droppedMessages() which return a tuple of original stream
Source[(Event, Promise[Done]), NotUsed] and a new stream, again of same type
Source[(Event, Promise[Done]), NotUsed] . For e.g.
val (stream, droppedEvents) = Source.actorRef[(Event, Promise[Done])](1024, OverflowStrategy.dropHead).droppedMessages()
Please note that our usecase is to perform an action (i.e. completion on
Promise ), on a message just before it gets dropped and we are totally fine to discard them after action.