[Question] How to do streaming with actor using Akka GRPC

Hi, I am new to akka-grpc and have a question on streaming API.

In the below example, I am trying to return Source[HelloReply, NotUsed] as a response for the API.

service GreeterService {
    rpc ItKeepsReplying (HelloRequest) returns (stream HelloReply) {}
}

Since new messages will be offered to the Source, I am trying to use Source.queue and return it(Source[HelloReply, NotUsed])) to the client.

    val (queue, source): (SourceQueueWithCompleteAndSize[HelloReply],
      Source[HelloReply, NotUsed]) = Source
      .queue[HelloReply](bufferSize, OverflowStrategy.backpressure)
      .toMat(
        PartitionHub.sink((size, elem) ⇒ math.abs(elem.hashCode % size),
          startAfterNrOfConsumers = 1,
          bufferSize = 256))(Keep.both)
      .run()

The problem is, there could be multiple queues and I want to create a corresponding actor for each queue to wrap it.

Then the GRPC service implementation class will have the pool(Map) of those actors and “ask” the Source to each actor.

  override def ItKeepsReplying(
      request: HelloRequest): Source[HelloReply, NotUsed] = {
    (someActor ? Fetch(request))
          .mapTo[...] // this will be Future
          ...

But the return type of “ask” API is Future. No matter how many transformations I performed, the final type would be Future(internal type would change over transformation).

Future[Message] => Future[Intermediate] => Future[HelloReply]

But as per the proto spec, what I need is Source.

  override def ItKeepsReplying(
      request: HelloRequest): Source[HelloReply, NotUsed]

I looked into Source.fromFuture, Source.fromFutureSource, or Source.actorRef but it seems neither of them fit with this situation.

Is there any good way to provide streaming API using Actor?

I tried doing this and it worked. Backpressure is not handled properly with this.

val (ref: ActorRef, publisher: Publisher[HelloReply]) =
      Source.actorRef[HelloReply](bufferSize = 1000, OverflowStrategy.dropHead)
        .toMat(Sink.asPublisher(true))(Keep.both).run()
    val source = Source.fromPublisher(publisher)
     runnableGraph = stream.via(killSwitch.flow).toMat(PartitionHub.sink((size, elem) => math.abs(elem.key.hashCode % size), startAfterNrOfConsumers = 1, bufferSize = 256))(Keep.right)
    runnableGraph.runForeach(msg => ref ! msg)

Thank you for the answer.

I finally implemented it like this:

In the actor

  val (queue, fromProducer): (SourceQueueWithCompleteAndSize[HelloReply],
                              Source[HelloReply, NotUsed]) = MeasurableSourceQueue
    .queue[HelloReply](100, OverflowStrategy.backpressure)
    .toMat(
      PartitionHub.sink((size, elem) ⇒ math.abs(elem.hashCode % size),
                        startAfterNrOfConsumers = 1,
                        bufferSize = 1))(Keep.both)
    .run()

  def receive = {

    case request: HelloRequest =>
      println(self)
      sender ! fromProducer
  }
 override def itKeepsReplying(
      in: HelloRequest): Source[HelloReply, NotUsed] = {
    Source
      .single(in)
      .ask[Source[HelloReply, NotUsed]](1)(actor)
      .flatMapConcat(x => x)
  }

Thank you for the reply.
Looks another interesting approach.