I have an actor source which will emit jobs when a trigger message is received:
import akka.NotUsed
import akka.stream.scaladsl.Source
import com.typesafe.scalalogging.LazyLogging
/**
* Reads from a source when a trigger is received and then pushes the message downstream.
*
* @param elements the source to be read on trigger.
* @param triggerSource the source which will receive trigger messages.
* @tparam Message the type of message the read-source will produce.
* @tparam Trigger the type of message the trigger-source will accept.
*/
class TriggeredSourceReader[Message, Trigger](
protected val elements: Source[Message, NotUsed],
protected val triggerSource: Source[Trigger, NotUsed]
) extends LazyLogging {
val source = elements.zipWith(triggerSource)((msg: Message, trigger: Trigger) ⇒ {
logger.debug(s"Releasing $msg")
msg
})
}
I set its buffer size to n
and make the flow return instantly. Despite this, I find that when I send more than n
messages (at a rate of 1 msg/s), the messages after the first n
are ignored.
I tried setting the overflow strategy to fail
, but it was ignored completely.
What am I overlooking here?