Polling a Web Service in Java using Akka HTTP

I’ve done a lot of looking on-line and I’m having some trouble finding relevant examples on how to begin. I’m writing some code that consumes a REST service. For this service, I make a POST request that starts a long running operation. The service returns me an identifier that I can poll with to get the state of the operation. I want to be able to poll with that identifier until I get a success or failure from the service or until I reach some sort of timeout condition.

My code is part of a Java-based system that uses Akka Actors, so it seemed to make sense that I use Akka HTTP to make the requests. My code is meant to be a facade that other parts of the system can use to make requests of this backend REST service. I’ve gotten some example tests for using the Request-Level Client-side API, but from reading the documentation, I suspect that maybe I should be using the Host-Level Client-Side API for this since I’ll be making repeated calls over a long period of time to an outside (of the Actor system) REST service.

My thought on design was that I would have an Actor that could receive messages from other Actors and handle making the request to the backend service. When the service completes, the Actor would send a message back with the result. I thought I would use a cachedHostConnectionPool inside of this Actor so that I could efficiently handle connections without swamping the backend service. I am also considering following the pattern in the ‘Getting Started with Actors’ tutorial where they represented a query of a bunch of IoT devices with short-lived Actors that represented the individual queries and handled the grunt work.

To summarize, I’d have a ‘manager’ Actor that controls the pool. The ‘manager’ would create workers that would use a connection from the pool to handle individual requests from clients of the ‘manager’ Actor and perform the polling logic on the backend service to eventually return the response.

I’m not really clear if that makes sense when using the cachedHostConnectionPool and I’m not really even sure how to setup usage of that pool. I’ve read some Scala examples, but I don’t really know that language well and I’m not familiar with how some of the objects and calling patterns map over to the Java equivalents.

Am I understanding the documentation correctly? Or am I barking up the wrong tree here? Is there any example code out there that could get me started?

Thanks in advance for any help and I’m happy to clarify anything to the extent that I’m able to.

Update:

I’ve played around with this a little more and I think I’ve translated some of the Scala. I’m not sure if this makes sense or not and I still can’t figure out how to hook the Source and Flow together. My idea with the Pair<HttpRequest,ActorRef> is that an actor could send a request via this Actor and the Actor would return the response back to the associated requesting Actor. I have no idea if this makes sense though:

public class ConnectionManager extends AbstractActor {

  private Flow<Pair<HttpRequest, ActorRef>, Pair<Try<HttpResponse>, ActorRef>, HostConnectionPool> connectionPool;
  private Source<HttpRequest, SourceQueueWithComplete<HttpRequest>> source;

  public static Props props(String host) {
    return Props.create(ConnectionManager.class, host);
  }

  public ConnectionManager(String host) {
    this.connectionPool = Http.get(context().system()).cachedHostConnectionPool(host);
    this.source = Source.queue(10, OverflowStrategy.dropNew());

  }

  @Override
  public Receive createReceive() {

    return null;
  }

}

The next part of the Scala example has the source hooked up to the connection pool with via(), but the Java DSL doesn’t seem to have a via that will take a Flow; only some kind of Graph.

Hi Joseph,

I am trying to do same thing. Below is my attempt but not sure if this is right way as i am getting only success response which i figured out why but don’t know right approach.

  final Http http = Http.get(system);
     final Flow<Pair<HttpRequest, Object>, Pair<Try<HttpResponse>, Object>, HostConnectionPool> flow;
      flow = http.cachedHostConnectionPool(ConnectHttp.toHost("host:port"));
    final SourceQueueWithComplete<Object> queue= Source.queue(3,OverflowStrategy.dropNew())
                 .map(d -> Pair.create(HttpRequest.create("contextpath"),d))
                 .via(flow)
                 .toMat(Sink.foreach(p -> p.first()), Keep.left())
                 .run(materializer);