Ending a stream that materializes an actor via ActorSource.actorRef?

I have a Source[T, ActorRef[T]] that was created via calling ActorSource.actorRef[T] as per ActorSource.actorRef • Akka Documentation. When I send the type matched by the completion matcher, the stream does not end as expected. Is there some other action I need to take to successfully complete the stream?

Here is code:

sealed trait ActorProtocol
case class Response(r: HttpResponse) extends ActorProtocol
object Complete extends ActorProtocol

// ... in a function that creates the stream
val (actor, httpSource) = ActorSource.actorRef[ActorProtocol](
      completionMatcher = { case Complete => println("Complete message sent!")},
      failureMatcher = PartialFunction.empty,
      bufferSize = 100,
      OverflowStrategy.dropNew
    ).preMaterialize()

// ... a while later, want to end the stream
actor ! Complete

// in console, see "Complete message sent!"

Since as the console showed, the message was received by the actor materialized by the stream, I’d expect the stream to complete, except it does not.

Here is some more code:

// a class to make completing the stream more ergonomic
sealed abstract class Completable[T](ref: ActorRef[T], completeMessage: T) with Cancellable {
   def complete = 
      cancel
      ref ! completeMessage
}

// in companion object
def fromCancellable[T](cancellable)(actor: ActorRef[T], completeMessage: T) = new Completable(actor, completeMessage) { override def cancel = cancellable.cancel }

// body of function that creates the stream - yields Source[HttpResponse, Completable]
val (actor, httpSource) = ActorSource.actorRef[ActorProtocol](
      completionMatcher = { case Complete => println("Complete message sent!")},
      failureMatcher = PartialFunction.empty,
      bufferSize = 100,
      OverflowStrategy.dropNew
    ).preMaterialize()

val runnable = new Runnable {
      override def run(): Unit = httpClient.makeRequest(request) onComplete {
        case Failure(exception) => throw exception
        case Success(res) => 
          actor ! Response(res)
      }
    }
val completable = fromCancellable(akkaScheduler.scheduleAtFixedRate(Duration.Zero, dur)(runnable))
httpSource
  .via(Flow.fromFunction[ActorProtocol, HttpResponse] { p => p match {
    case Response(r) => r
  }})
  .mapMaterializedValue(_ => completable)

// how it gets called
val (completable, stream) = codeToCreateStream

// ... some time later
completable.complete
// see complete message printed, expect stream to complete, but it does not :(

The stream won’t end as long as some “upstream” element has values to emit. This means that the ActorSource can complete without actually terminating the whole graph. It’s difficult to guess what might be causing it to stay alive without seeing more of your graph.

Try adding a watchTermination • Akka Documentation immediately after the ActorSource and have it print out when upstream closes.

Eg:

val (actor, httpSource) = ActorSource.actorRef[ActorProtocol](
  completionMatcher = { case Complete => println("Complete message sent!")},
  failureMatcher = PartialFunction.empty,
  bufferSize = 100,
  OverflowStrategy.dropNew
).watchTermination() { (mat, future) =>
  future.onComplete { t =>
    println("ActorSource completed with $t")
  }
  mat
}.preMaterialize()

watchTermination will report if upstream closes (in this case, just the ActorSource), or downstream cancels, or anyone at all has an unrecovered error. Keep in mind a graph can continue running even with many parts of it shut down as long as there is data to move.