How to call Akka Http POST in loop (1000-10000 times)?

I am learning Akka with Java. I have written a simple program with two actors.

My first actor ActorA is called with list containing 1000 strings. ActorA loops through the list and calls ActorB for each element.

ActorB makes a Http POST call to external service using the String parameter received from ActorA .

I am expecting that ActorB will successfully make 1000 Http POST calls and will receive equal number of responses. However ActorB is able to make POST request randomly between 80-120 times then it stops making POST calls.

I tried providing a custom dispatcher as HTTP POST call is a blocking operation but still no luck!!

Refer to code and configuration given below.

public class ActorA extends AbstractActor {
    static public Props props() {
        return Props.create(ActorA.class);
    }


    static public class IdWrapper {
        List<String> ids;

        public IdWrapper(List<String> ids) {
            this.ids = ids;
        }
    }


    @Override
    public Receive createReceive() {
        return receiveBuilder()
                .match(IdWrapper.class, this::process)
                .build();
    }

    private void process(IdWrapper msg) {
        msg.ids.forEach(id -> {
            context().actorSelection("actorB").tell(new MessageForB(id), ActorRef.noSender());
            }
        );
    }
}
public class ActorB extends AbstractActor {   
    
    final Http http = Http.get(getContext().system());
    final Materializer materializer = ActorMaterializer.create(context());    

    public static Props props() {
        return Props.create(ActorB.class);
    }

    static public class MessageForB implements Serializable {
        String id;

        public MessageForB(String id) {
            this.id = id;
        }
    }


    @Override
    public Receive createReceive() {
        return receiveBuilder()
                .match(MessageForB.class, this::process)
                .build();
    }

    private void process(MessageForB MessageForB) {

        ExecutionContext ec = getContext().getSystem().dispatchers().lookup("my-blocking-dispatcher");
        /**
         * Get id from request
         */
        String reqId = MessageForB.id;

        /**
         * Prepare request
         */
        XmlRequest requestEntity = getRequest(Stream.of(reqId).collect(Collectors.toList()));
        String requestAsString = null;


        try {
            /**
             * Create and configure JAXBMarshaller.
             */
            JAXBContext jaxbContext = JAXBContext.newInstance(XmlRequest.class);
            Marshaller jaxbMarshaller = jaxbContext.createMarshaller();
            jaxbMarshaller.setProperty(Marshaller.JAXB_FRAGMENT, Boolean.TRUE);

            /**
             * Convert request entity to string before making POST request.
             */
            StringWriter sw = new StringWriter();
            jaxbMarshaller.marshal(requestEntity, sw);
            requestAsString = sw.toString();

        } catch (JAXBException e) {
            e.printStackTrace();
        }


        /**
         * Create RequestEntity from request string.
         */
        RequestEntity entity = HttpEntities.create(
                MediaTypes.APPLICATION_XML.toContentType(HttpCharsets.ISO_8859_1),
                requestAsString);
        
        /**
         * Create Http POST with necessary headers and call
         */
        final CompletionStage<HttpResponse> responseFuture =
                http.singleRequest(HttpRequest.POST("http://{hostname}:{port}/path")
                        .withEntity(entity));

        responseFuture
                .thenCompose(httpResponse -> {
					/**
					 * Convert response into String
					 **/
                    final CompletionStage<String> res = Unmarshaller.entityToString().unmarshal
                            (httpResponse.entity(), ec, materializer);
					/**
					 * Consume response bytes
					 **/
                    httpResponse.entity().getDataBytes().runWith(Sink.ignore(), materializer);
                    return res;

                })
                .thenAccept(s -> {
                    
                    try {
                        /**
                         * Deserialize string to DTO.
                         */
                        MyResponse MyResponse = getMyResponse(s);
                        
						// further processing..
						
                    } catch (JAXBException e) {
                        e.printStackTrace();
                    }
                });
    }

    private XmlRequest getRequest(List<String> identifiers){
        XmlRequest request = new XmlRequest();
        // Business logic to create req entity
        return request;
    }
	
	    private MyResponse getMyResponse(String s) throws JAXBException {
        JAXBContext jaxbContext = JAXBContext.newInstance
                (MyResponse.class);
        javax.xml.bind.Unmarshaller jaxbUnmarshaller = jaxbContext
                .createUnmarshaller();
        StringReader reader = new StringReader(s);
        return (MyResponse)
                jaxbUnmarshaller.unmarshal(reader);
    }
}

my-blocking-dispatcher {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
core-pool-size-min = 5
core-pool-size-max = 20
}
throughput = 1
}

Where can I improve or correct my code so that ActorB will successfully be able to make Http POST calls for all the items sent by ActorA ?

Hi @Gurunath,

hard to say. Maybe there are some exceptions that are thrown somewhere unexpected and you don’t know about? You seem to be throwing away the result of responseFuture...thenAccept so you won’t see any exceptions thrown during processing the response.

You can try enabling debug logging that should give you detailed logging about what the HTTP connection pool is doing.

Which versions of Akka and Akka HTTP do you use?

Johannes

Hi @jrudolph ,Thanks for your response !!

I added following block of code after theAccept which gave me clear picture of what is going wrong.

.whenComplete((aVoid, throwable) -> {
                    System.out.println("Inside whenComplete");
                    System.out.println(aVoid);
                    System.out.println(throwable.getMessage());
        }

Exception being thrown by Akka is as follows:

akka.stream.BufferOverflowException: Exceeded configured max-open-requests value of [32]. This means that the request queue of this pool...

To overcome this issue I increased max-open-requests to 1024 in configuration.
Please suggest if there is any better way to handle this.

akka.http {
	host-connection-pool {        
        max-open-requests = 1024
      }
}

I am using following dependencies.

<dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-stream_2.12</artifactId>
            <version>2.5.11</version>
        </dependency>
<dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-http_2.12</artifactId>
            <version>10.0.11</version>
</dependency>

Most likely you are not completely reading the responses which means the client connection pool might not make progress fast enough to keep up with the rate of requests you submit to it. See https://doc.akka.io/docs/akka-http/current/implications-of-streaming-http-entity.html for more information about that kind of mistake.

Can you try updating to akka-http 10.1.7? We fixed some issues in the pool with 10.1.x which now make it less likely that it fails with that exception.

To make sure not to issue too many requests to the pool at the same time and you know about all the requests you going to run up front you can also use the streaming Akka HTTP client API to make sure not too overflow it. See https://doc.akka.io/docs/akka-http/current/client-side/host-level.html#using-the-host-level-api-with-a-queue for more information about that.