Why is my 'triggerable' Akka source's buffer overflow strategy being ignored?

scala

(Elijah Rippeth) #1

I have an actor source which will emit jobs when a trigger message is received:

import akka.NotUsed
import akka.stream.scaladsl.Source
import com.typesafe.scalalogging.LazyLogging

/**
  * Reads from a source when a trigger is received and then pushes the message downstream.
  *
  * @param elements the source to be read on trigger.
  * @param triggerSource the source which will receive trigger messages.
  * @tparam Message the type of message the read-source will produce.
  * @tparam Trigger the type of message the trigger-source will accept.
  */
class TriggeredSourceReader[Message, Trigger](
  protected val elements: Source[Message, NotUsed],
  protected val triggerSource: Source[Trigger, NotUsed]
) extends LazyLogging {

  val source = elements.zipWith(triggerSource)((msg: Message, trigger: Trigger) ⇒ {
    logger.debug(s"Releasing $msg")
    msg
  })

}

I set its buffer size to n and make the flow return instantly. Despite this, I find that when I send more than n messages (at a rate of 1 msg/s), the messages after the first n are ignored.

I tried setting the overflow strategy to fail, but it was ignored completely.

What am I overlooking here?


(Elijah Rippeth) #2

My flow forgot to send the Trigger message to the source!