Continuously growing response time with cachedHostConnectionPool

Hi here,

I use a cachedHostConnectionPool for sending requests to the backend:

val poolClientFlow = {
      val pool = Http().cachedHostConnectionPool[Promise[HttpResponse]](host, port)
      logger.debug("Pool created")
      pool
    }

    val q = Source.queue[(HttpRequest, Promise[HttpResponse])](QueueSize, OverflowStrategy.dropNew)
      .via(poolClientFlow)
      .toMat(Sink.foreach({
        case ((Success(resp), p)) => p.success(resp)
        case ((Failure(e), p)) => p.failure(e)
      }))(Keep.left)
      .run()
    logger.debug("Q created", q)
    q
  }

In a test I send many parallel requests to the backend for testing performance.

val vertices = Await.result(Future.sequence(for (_ <- 0 to 50) yield ogitApi.Vertex.create(graphId, genPersonVertex)), timeout)

      val s = Future.sequence(vertices.zipWithIndex.map {
        case (v, n) =>
          val t = System.currentTimeMillis()
          for {
            resp <- ogitApi.Vertex.saveEventsData(graphId, v, data)
          } yield {
            val d = System.currentTimeMillis() - t
            logger.debug(s"Done - $n in $d ms. Response: $resp")
            d shouldBe <(maxResponseTime)
            resp
          }
      })

I noticed that the response time continuously grows up:

2018-06-06 20:51:42 [DEBUG] - Done - 1 in 814 ms. Response: true
2018-06-06 20:51:42 [DEBUG] - Done - 3 in 820 ms. Response: true
2018-06-06 20:51:42 [DEBUG] - Done - 0 in 845 ms. Response: true
2018-06-06 20:51:42 [DEBUG] - Done - 2 in 907 ms. Response: true
2018-06-06 20:51:43 [DEBUG] - Done - 4 in 1451 ms. Response: true
2018-06-06 20:51:43 [DEBUG] - Done - 5 in 1452 ms. Response: true
2018-06-06 20:51:43 [DEBUG] - Done - 6 in 1459 ms. Response: true
2018-06-06 20:51:43 [DEBUG] - Done - 7 in 1537 ms. Response: true
2018-06-06 20:51:44 [DEBUG] - Done - 8 in 2085 ms. Response: true
2018-06-06 20:51:44 [DEBUG] - Done - 9 in 2089 ms. Response: true
2018-06-06 20:51:44 [DEBUG] - Done - 10 in 2099 ms. Response: true
2018-06-06 20:51:44 [DEBUG] - Done - 11 in 2167 ms. Response: true
2018-06-06 20:51:44 [DEBUG] - Done - 12 in 2715 ms. Response: true
2018-06-06 20:51:44 [DEBUG] - Done - 14 in 2718 ms. Response: true
2018-06-06 20:51:44 [DEBUG] - Done - 13 in 2726 ms. Response: true
2018-06-06 20:51:44 [DEBUG] - Done - 15 in 2785 ms. Response: true
2018-06-06 20:51:45 [DEBUG] - Done - 16 in 3335 ms. Response: true
2018-06-06 20:51:45 [DEBUG] - Done - 17 in 3341 ms. Response: true
2018-06-06 20:51:45 [DEBUG] - Done - 18 in 3362 ms. Response: true
...

On backend side processing time remains the same:

data saved in 47
data saved in 621
data saved in 618
data saved in 613
data saved in 614
data saved in 629
data saved in 627
data saved in 624
data saved in 624
data saved in 619
data saved in 612
data saved in 622
data saved in 613
data saved in 612

As I understood response time = time in queue + (request -> response time). Am I right?

How can I make this process faster? I’ve played with configuration settings but I haven’t got any essential progress.

In the code where you are sending parallell requests I can’t see that you are using Akka HTTP at all? Is the saveEventsData using the cached connection pool? How does that code look?

val poolClientFlow = {
      val pool = Http().cachedHostConnectionPool[Promise[HttpResponse]](host, port)
      logger.debug("Pool created")
      pool
    }

val q = Source.queue[(HttpRequest, Promise[HttpResponse])](QueueSize, OverflowStrategy.dropNew)
      .via(poolClientFlow)
      .toMat(Sink.foreach({
        case ((Success(resp), p)) => p.success(resp)
        case ((Failure(e), p)) => p.failure(e)
      }))(Keep.left)
      .run()
    logger.debug("Q created", q)
    q
  }

Yeah I saw that, but the part with the timing does not actually use the poolClientFlow or q?

saveEventsData method builds request and sends it through queue.

It is the code which adds request to the queue:

def queueRequest(request: HttpRequest)(implicit ctx: ExecutionContext): Future[HttpResponse] = {
    val responsePromise = Promise[HttpResponse]()
    queue.offer(request -> responsePromise).flatMap {
      case QueueOfferResult.Enqueued => responsePromise.future
      case QueueOfferResult.Dropped =>
        withRetry()(queueRequest(request))

      case QueueOfferResult.Failure(ex) =>
        logger.error(s"Error processing request ${request.method.name()} - ${request.getUri()}. Retrying.", ex)
        withRetry()(queueRequest(request))

      case QueueOfferResult.QueueClosed => Future.failed(new RuntimeException("Queue was closed (pool shut down) while running the request. Try again later."))
    }
  }

In my case timing consists of time in queue and request-response time

Alright, that makes it a bit more clear :)

The client pool is by default configured to not do more than 4 connections per host, which is likely what you are seeing judging from the timing logs which looks like they are the ~620ms server times plus some network overhead in batches of four, you can change it using akka.http.host-connection-pool.max-connections.

In general I’d recommend using mapAsync + Http.singleRequest over basically reimplementing those two yourself, just for simplicity.

Thanks. I tried mapAsync with Http.singleRequest. The problem is that single requests creates pool as well and it takes some time. I need to process many requests in a row as fast as possible.

I’ve increased akka.http.host-connection-pool.max-connections. It helped a bit. I thought that probably there are some tricks to improve processing more.

The singleRequest should only create the per host pool on the first request and then it will use the same cached pool, so I don’t think you should see much of a performance difference based on that.

I’d look into if it is possible to avoid the Source.queue part and have the elements stream all the way from where they are generated if I were you. And also, make sure you verify that the client actually is the bottleneck and you are not saturating the network or the server already.

Removing Source.queue looks like a good idea, I will check. Thank you