akka grpc server stream cannot return every result in real time #1735

First of all, I have a server stream method. In the method, I will request an actor in a loop. I expect that after each request is responded, the application will return the result to the caller. But no, I found that it still waits for all responses to be completed before returning, is this normal?

// The code is as follows, for the convenience of observing the results, I added sleep
override def itKeepsReplying(in: HelloRequest): Source[HelloReply, NotUsed] = {
    Source(1 to 10).map(index => {
        Thread.sleep(5000)
        HelloReply(index)
    })
}

In order to confirm whether grpc itself supports, I tried java’s implementation of this piece
If you want to send data to the caller:responseObserver.onNext(responseBuilder.build());
If you want to end this call: responseObserver.onCompleted();
Data is sent to the caller every time onNext is called

So my question is:
Is my assumption correct? Can akka-grpc do it
If it can be done, please give an example

In general you should never do blocking operations in a stream operator unless you configure it to run on a separate dispatcher specifically for blocking, this also means the operator will become an async island, blocking only element throughput. Some more about the general problem in the docs here

If you for experimentation just want to delay the response you should instead use an async flow .delay which is non-blocking and allow the stream to do other work during the delay.

The gory details: Akka HTTP/gRPC “sub-materializes” the returned stream into the HTTP server logic, for performance reasons, so therefore blocking in your stream will block the entire HTTP connection stream from doing any processing.

This sleep is just for testing, there is no delay logic in the actual code

But I am still sure that akka-grpc server stream needs to wait for all data to be completed before returning

But other grpc implementations can return data at any time until the stream is actively terminated

So, is this phenomenon expected?

Both Akka actors and streams are highly optimized for asynchronous logic, that’s why extra care needs to be taken to not block of a thread or handle such cases safely.

For a server streaming out, without any such blocking logic, Akka gRPC (and HTTP) will emit the first element as soon as has been produced, and it is possible to hand it off to the network. Just like you expect.

1 Like
    Source(1 to 10).map(_ => actorRef.ask(ref => GetCacheLog(logRequest, ref)))
                   .map(logResponseFuture => logResponseFuture.asInstanceOf[Future[LogResponse]])
                   .map(logResponseFuture => Await.result(logResponseFuture, timeout.duration))

That must be the problem I use, I paste a real code
I pass the actorRef request and receive the response back to the caller
I used Await.result to extract value from future
Is it caused by using this method?

If so, how should I extract the value in the future in the response without blocking? I have read the documentation for a long time but it is not well combined with the current scene

Await.result is blocking for the future to complete, instead you should use one of the actor ask operators built into streams (see ActorFlow.ask • Akka Documentation) or do the ask in a mapAsync which will wait for the reply without blocking.

1 Like

ok thank you very much for your answer i will try it now