Why is my Source actorRef only emitting a single message?

I’m trying to replicate the behavior of zipping sources like in the Akka Streams cookbook without using the GraphDSL. I have the following test:

import akka.NotUsed
import akka.actor.{ActorRef, ActorSystem}
import akka.pattern.pipe
import akka.stream.scaladsl.{Sink, Source}
import akka.stream.{ActorMaterializer, OverflowStrategy}
import akka.testkit.{TestKit, TestProbe}
import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}

import scala.concurrent.ExecutionContext
import scala.concurrent.duration._

class TriggeredSourceReaderSpec extends TestKit(ActorSystem("triggered-source-reader"))
    with WordSpecLike with Matchers with BeforeAndAfterAll {

  override def afterAll(): Unit = try {
    super.afterAll()
  } finally {
    system.terminate()
  }

  sealed trait TestMessage
  final case class Work(jobNumber: Int) extends TestMessage
  case object Trigger

  private final val TRIGGER_BUFFER_SIZE = 1

  private implicit val materializer: ActorMaterializer = ActorMaterializer()
  private implicit val ec: ExecutionContext = system.dispatcher

  "A triggered source reader" should {
    "not emit anything until a trigger message is received" in {

      val testProbe = TestProbe()

      val jobs = (1 to 10).map(Work)

      val source: Source[Work, NotUsed] = Source(jobs)

      val (actor, triggerSource): (ActorRef, Source[Trigger.type, NotUsed]) =
        Source.actorRef[Trigger.type](TRIGGER_BUFFER_SIZE, OverflowStrategy.dropNew).preMaterialize()

      val zippedSources =
        source.zipWith(triggerSource)((msg: Work, trigger: Trigger.type) ⇒ msg)

      zippedSources.runWith(Sink.head).pipeTo(testProbe.ref)

      jobs.foreach { job =>
        actor ! Trigger
        val msg: Work = testProbe.expectMsgType[Work](5.seconds)
        msg should===(job)
        testProbe.expectNoMessage(2.second)
      }

      testProbe.expectNoMessage(3.seconds)

    }
  }

}

This test fails after Work(1) is received, claiming that the second Trigger message was not delivered.

Why is there only a single message being emitted?

Try with Sink.last. After the first element the head will complete the whole stream.

1 Like

@tg44 Oh, how silly of me. :slightly_smiling_face: This wasn’t quite what I wanted, but it put me on the right track. I ended up switching it to

zippedSources.runForeach(testProbe.ref ! _)

and it worked exactly as intended!