I’m trying to replicate the behavior of zipping sources like in the Akka Streams cookbook without using the GraphDSL
. I have the following test:
import akka.NotUsed
import akka.actor.{ActorRef, ActorSystem}
import akka.pattern.pipe
import akka.stream.scaladsl.{Sink, Source}
import akka.stream.{ActorMaterializer, OverflowStrategy}
import akka.testkit.{TestKit, TestProbe}
import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
class TriggeredSourceReaderSpec extends TestKit(ActorSystem("triggered-source-reader"))
with WordSpecLike with Matchers with BeforeAndAfterAll {
override def afterAll(): Unit = try {
super.afterAll()
} finally {
system.terminate()
}
sealed trait TestMessage
final case class Work(jobNumber: Int) extends TestMessage
case object Trigger
private final val TRIGGER_BUFFER_SIZE = 1
private implicit val materializer: ActorMaterializer = ActorMaterializer()
private implicit val ec: ExecutionContext = system.dispatcher
"A triggered source reader" should {
"not emit anything until a trigger message is received" in {
val testProbe = TestProbe()
val jobs = (1 to 10).map(Work)
val source: Source[Work, NotUsed] = Source(jobs)
val (actor, triggerSource): (ActorRef, Source[Trigger.type, NotUsed]) =
Source.actorRef[Trigger.type](TRIGGER_BUFFER_SIZE, OverflowStrategy.dropNew).preMaterialize()
val zippedSources =
source.zipWith(triggerSource)((msg: Work, trigger: Trigger.type) ⇒ msg)
zippedSources.runWith(Sink.head).pipeTo(testProbe.ref)
jobs.foreach { job =>
actor ! Trigger
val msg: Work = testProbe.expectMsgType[Work](5.seconds)
msg should===(job)
testProbe.expectNoMessage(2.second)
}
testProbe.expectNoMessage(3.seconds)
}
}
}
This test fails after Work(1)
is received, claiming that the second Trigger
message was not delivered.
Why is there only a single message being emitted?