Akka gRPC with Akka HTTP Routes

Greetings, I need to put gRPC on the same port as Akka HTTP. The path I have taken so far is to instantiate the generated gRPC handler via partial(), then use it in a route as follows:

val echoService = EchoServiceHandler.partial(new EchoServiceImpl(mat))
...
extractRequest { r =>
  if (echoService.isDefinedAt(r)) complete(echoService(r)) else reject
} ~

This fails with an exception that is attached. If I wire the route without the domain test as complete(echoService(r)), the request completes successfully.

Not sure if I am misusing the API or the generation of the gRPC handler leaves something to be desired. Thoughts?

java.lang.IllegalStateException: Substream Source(substream-out-1) cannot be materialized more than once
	at akka.stream.impl.fusing.SubSource$$anon$13.createMaterializedTwiceException(StreamOfStreams.scala:826)
	at akka.stream.impl.fusing.SubSource$$anon$13.<init>(StreamOfStreams.scala:797)
	at akka.stream.impl.fusing.SubSource.createLogic(StreamOfStreams.scala:793)
	at akka.stream.stage.GraphStage.createLogicAndMaterializedValue(GraphStage.scala:106)
	at akka.stream.stage.GraphStageWithMaterializedValue.createLogicAndMaterializedValue(GraphStage.scala:50)
	at akka.stream.impl.GraphStageIsland.materializeAtomic(PhasedFusingActorMaterializer.scala:701)
	at akka.stream.impl.PhasedFusingActorMaterializer.materialize(PhasedFusingActorMaterializer.scala:500)
	at akka.stream.impl.PhasedFusingActorMaterializer.materialize(PhasedFusingActorMaterializer.scala:450)
	at akka.stream.impl.PhasedFusingActorMaterializer.materialize(PhasedFusingActorMaterializer.scala:442)
	at akka.stream.scaladsl.RunnableGraph.run(Flow.scala:703)
	at akka.stream.scaladsl.Source.runWith(Source.scala:118)
	at akka.grpc.scaladsl.GrpcMarshalling$.unmarshal(GrpcMarshalling.scala:53)
	at com.example.protobuf.EchoServiceHandler$.$anonfun$partial$1(EchoServiceHandler.scala:91)
	at akka.grpc.scaladsl.GrpcMarshalling$.$anonfun$negotiated$2(GrpcMarshalling.scala:45)
	at scala.util.Success.map(Try.scala:262)
	at akka.grpc.scaladsl.GrpcMarshalling$.$anonfun$negotiated$1(GrpcMarshalling.scala:45)
	at scala.Option.map(Option.scala:242)
	at akka.grpc.scaladsl.GrpcMarshalling$.negotiated(GrpcMarshalling.scala:43)
	at com.example.protobuf.EchoServiceHandler$.handle$1(EchoServiceHandler.scala:86)
	at com.example.protobuf.EchoServiceHandler$.$anonfun$partial$5(EchoServiceHandler.scala:102)
	at scala.PartialFunction$Unlifted.applyOrElse(PartialFunction.scala:319)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:35)
	at com.example.service.MainService.$anonfun$routes$2(Main.scala:120)
	at akka.http.scaladsl.server.directives.RouteDirectives.$anonfun$complete$1(RouteDirectives.scala:47)
	at akka.http.scaladsl.server.StandardRoute$$anon$1.apply(StandardRoute.scala:19)
	at akka.http.scaladsl.server.StandardRoute$$anon$1.apply(StandardRoute.scala:19)
	at akka.http.scaladsl.server.directives.BasicDirectives.$anonfun$textract$2(BasicDirectives.scala:161)
	at akka.http.scaladsl.server.RouteConcatenation$RouteWithConcatenation.$anonfun$$tilde$1(RouteConcatenation.scala:44)
	at akka.http.scaladsl.server.RouteConcatenation$RouteWithConcatenation.$anonfun$$tilde$1(RouteConcatenation.scala:44)
	at akka.http.scaladsl.server.RouteConcatenation$RouteWithConcatenation.$anonfun$$tilde$1(RouteConcatenation.scala:44)
	at akka.http.scaladsl.server.RouteConcatenation$RouteWithConcatenation.$anonfun$$tilde$2(RouteConcatenation.scala:47)
	at akka.http.scaladsl.util.FastFuture$.strictTransform$1(FastFuture.scala:41)
	at akka.http.scaladsl.util.FastFuture$.transformWith$extension(FastFuture.scala:45)
	at akka.http.scaladsl.util.FastFuture$.flatMap$extension(FastFuture.scala:26)
	at akka.http.scaladsl.server.RouteConcatenation$RouteWithConcatenation.$anonfun$$tilde$1(RouteConcatenation.scala:44)
	at akka.http.scaladsl.server.directives.BasicDirectives.$anonfun$mapRouteResultWith$2(BasicDirectives.scala:74)
	at akka.http.scaladsl.server.directives.BasicDirectives.$anonfun$textract$2(BasicDirectives.scala:161)
	at akka.http.scaladsl.server.directives.ExecutionDirectives.$anonfun$handleExceptions$2(ExecutionDirectives.scala:32)
	at akka.http.scaladsl.server.directives.BasicDirectives.$anonfun$textract$2(BasicDirectives.scala:161)
	at akka.http.scaladsl.server.Route$.$anonfun$asyncHandler$1(Route.scala:86)
	at akka.http.impl.engine.http2.Http2Blueprint$.$anonfun$handleWithStreamIdHeader$1(Http2Blueprint.scala:123)
	at akka.stream.impl.fusing.MapAsyncUnordered$$anon$31.onPush(Ops.scala:1401)
	at akka.stream.impl.fusing.GraphInterpreter.processPush(GraphInterpreter.scala:541)
	at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:423)
	at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:625)
	at akka.stream.impl.fusing.ActorGraphInterpreter$SimpleBoundaryEvent.execute(ActorGraphInterpreter.scala:56)
	at akka.stream.impl.fusing.ActorGraphInterpreter$SimpleBoundaryEvent.execute$(ActorGraphInterpreter.scala:52)
	at akka.stream.impl.fusing.ActorGraphInterpreter$BatchingActorInputBoundary$OnNext.execute(ActorGraphInterpreter.scala:95)
	at akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:600)
	at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:769)
	at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:784)
	at akka.actor.Actor.aroundReceive(Actor.scala:535)
	at akka.actor.Actor.aroundReceive$(Actor.scala:533)
	at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:691)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:575)
	at akka.actor.ActorCell.invoke(ActorCell.scala:545)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
	at akka.dispatch.Mailbox.run(Mailbox.scala:231)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:243)

Hi @barlen,

I’ve used the g8 template (https://github.com/akka/akka-grpc-quickstart-scala.g8) to produce a simple project and edited the GreeterServer to include an Akka HTTP Route:


    val plainHTMLRoute = Route.asyncHandler {
      import akka.http.scaladsl.model._
      import akka.http.scaladsl.server.Directives._
      path("hello") {
        get {
          complete(
            HttpEntity(
              ContentTypes.`text/html(UTF-8)`,
              "<h1>Say hello to akka-http</h1>"
            )
          )
        }
      }
    }
    val notFound =
      scala.concurrent.Future.successful(HttpResponse(StatusCodes.NotFound))
    val service: HttpRequest => Future[HttpResponse] =
      GreeterServiceHandler
        .partial(new GreeterServiceImpl(mat))
        .orElse {
          PartialFunction.fromFunction(plainHTMLRoute)
        }
        .orElse { case _ => notFound }

    val bound = Http().bindAndHandleAsync(
      service,
      interface = "127.0.0.1",
      port = 8080,
      connectionContext = HttpConnectionContext(http2 = Always)
    )

You can see all the code in this GitHub repo.

You can start the server using sbt> runMain com.example.helloworld.GreeterServer and then try it out using curl -vvv http://localhost:8080/hello.

Hope that helps,

Oh that does help a lot. Thank you very much!

1 Like