Hello there,

In akka actor classic, there is a convenient helper method ActorFlow.actorRef in play.api.libs.streams.

I needed the analogue for a typed actor version, and didn’t find any. Is there something already existing? If not, this is the implementation I came up with:

import akka.actor.ActorSystem
import akka.actor.typed.scaladsl.adapter._
import akka.actor.typed.scaladsl.{ActorContext, Behaviors}
import akka.actor.typed.{ActorRef, Behavior, Terminated}
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import akka.stream.typed.scaladsl.{ActorSink, ActorSource}
import akka.stream.{Materializer, OverflowStrategy}

  private sealed trait EndOfStream
  private case object Success extends EndOfStream
  private case object Failure extends EndOfStream

   * Creates a [[Flow]] where incoming elements (of type `In`) are sent to the given behaviour, and messages (of type
   * `Out`) sent to created actor go downstream.
   * The `behavior` will receive an actor of type `Out`, and all message sent to it will go downstream.
   * This is particularly useful for using in Play Websockets, where incoming messages from the client are handled by
   * the given behavior, and outgoing messages come from the actor.
   * @param behavior Actor responsible to receive (and handle) elements of type In coming from upstream. This behavior
   *                 will receive as input the actor to which sent elements go downstream.
   * @param actorName name of the actor to spawn
   * @param bufferSize size of the upstream elements buffer
   * @param overflowStrategy strategy describing how to handle buffer overflow
   * @param actorSystem surrounding actor system
   * @tparam In type of elements/messages coming from upstream
   * @tparam Out type of elements/messages going downstream
   * @return a [[Flow]] from In to Out.
  def actorRef[In, Out](
      behavior: ActorRef[Out] => Behavior[In],
      actorName: String,
      bufferSize: Int                    = 16,
      overflowStrategy: OverflowStrategy = OverflowStrategy.dropNew
  )(implicit actorSystem: ActorSystem): Flow[In, Out, _] = {
    val (outActor, publisher) = ActorSource
        { case _ if false      => }: PartialFunction[Out, Unit],
        { case e: Any if false => new Exception(e.toString) }: PartialFunction[Any, Throwable],

    val sink = Flow[In]
      .map(Right[EndOfStream, In])
        ActorSink.actorRef[Either[EndOfStream, In]](
            Behaviors.setup[Either[EndOfStream, In]] { context =>
              val flowActor = context.spawn(behavior(outActor), "flowActor")

                .receiveMessage[Either[EndOfStream, In]] {
                  case Right(in) =>
                    flowActor ! in
                  case Left(_) =>
                .receiveSignal {
                  case (_, Terminated(_)) =>
          _ => Left(Failure)



It probably needs a tiny bit lifting but otherwise, is this something that would be suitable for a pull request?

Thanks in advance!