Continuously growing response time with cachedHostConnectionPool


(Pavel Tsipinio) #1

Hi here,

I use a cachedHostConnectionPool for sending requests to the backend:

val poolClientFlow = {
      val pool = Http().cachedHostConnectionPool[Promise[HttpResponse]](host, port)
      logger.debug("Pool created")
      pool
    }

    val q = Source.queue[(HttpRequest, Promise[HttpResponse])](QueueSize, OverflowStrategy.dropNew)
      .via(poolClientFlow)
      .toMat(Sink.foreach({
        case ((Success(resp), p)) => p.success(resp)
        case ((Failure(e), p)) => p.failure(e)
      }))(Keep.left)
      .run()
    logger.debug("Q created", q)
    q
  }

In a test I send many parallel requests to the backend for testing performance.

val vertices = Await.result(Future.sequence(for (_ <- 0 to 50) yield ogitApi.Vertex.create(graphId, genPersonVertex)), timeout)

      val s = Future.sequence(vertices.zipWithIndex.map {
        case (v, n) =>
          val t = System.currentTimeMillis()
          for {
            resp <- ogitApi.Vertex.saveEventsData(graphId, v, data)
          } yield {
            val d = System.currentTimeMillis() - t
            logger.debug(s"Done - $n in $d ms. Response: $resp")
            d shouldBe <(maxResponseTime)
            resp
          }
      })

I noticed that the response time continuously grows up:

2018-06-06 20:51:42 [DEBUG] - Done - 1 in 814 ms. Response: true
2018-06-06 20:51:42 [DEBUG] - Done - 3 in 820 ms. Response: true
2018-06-06 20:51:42 [DEBUG] - Done - 0 in 845 ms. Response: true
2018-06-06 20:51:42 [DEBUG] - Done - 2 in 907 ms. Response: true
2018-06-06 20:51:43 [DEBUG] - Done - 4 in 1451 ms. Response: true
2018-06-06 20:51:43 [DEBUG] - Done - 5 in 1452 ms. Response: true
2018-06-06 20:51:43 [DEBUG] - Done - 6 in 1459 ms. Response: true
2018-06-06 20:51:43 [DEBUG] - Done - 7 in 1537 ms. Response: true
2018-06-06 20:51:44 [DEBUG] - Done - 8 in 2085 ms. Response: true
2018-06-06 20:51:44 [DEBUG] - Done - 9 in 2089 ms. Response: true
2018-06-06 20:51:44 [DEBUG] - Done - 10 in 2099 ms. Response: true
2018-06-06 20:51:44 [DEBUG] - Done - 11 in 2167 ms. Response: true
2018-06-06 20:51:44 [DEBUG] - Done - 12 in 2715 ms. Response: true
2018-06-06 20:51:44 [DEBUG] - Done - 14 in 2718 ms. Response: true
2018-06-06 20:51:44 [DEBUG] - Done - 13 in 2726 ms. Response: true
2018-06-06 20:51:44 [DEBUG] - Done - 15 in 2785 ms. Response: true
2018-06-06 20:51:45 [DEBUG] - Done - 16 in 3335 ms. Response: true
2018-06-06 20:51:45 [DEBUG] - Done - 17 in 3341 ms. Response: true
2018-06-06 20:51:45 [DEBUG] - Done - 18 in 3362 ms. Response: true
...

On backend side processing time remains the same:

data saved in 47
data saved in 621
data saved in 618
data saved in 613
data saved in 614
data saved in 629
data saved in 627
data saved in 624
data saved in 624
data saved in 619
data saved in 612
data saved in 622
data saved in 613
data saved in 612

As I understood response time = time in queue + (request -> response time). Am I right?

How can I make this process faster? I’ve played with configuration settings but I haven’t got any essential progress.


(Johan Andrén) #2

In the code where you are sending parallell requests I can’t see that you are using Akka HTTP at all? Is the saveEventsData using the cached connection pool? How does that code look?


(Pavel Tsipinio) #3
val poolClientFlow = {
      val pool = Http().cachedHostConnectionPool[Promise[HttpResponse]](host, port)
      logger.debug("Pool created")
      pool
    }

val q = Source.queue[(HttpRequest, Promise[HttpResponse])](QueueSize, OverflowStrategy.dropNew)
      .via(poolClientFlow)
      .toMat(Sink.foreach({
        case ((Success(resp), p)) => p.success(resp)
        case ((Failure(e), p)) => p.failure(e)
      }))(Keep.left)
      .run()
    logger.debug("Q created", q)
    q
  }

(Johan Andrén) #4

Yeah I saw that, but the part with the timing does not actually use the poolClientFlow or q?


(Pavel Tsipinio) #5

saveEventsData method builds request and sends it through queue.

It is the code which adds request to the queue:

def queueRequest(request: HttpRequest)(implicit ctx: ExecutionContext): Future[HttpResponse] = {
    val responsePromise = Promise[HttpResponse]()
    queue.offer(request -> responsePromise).flatMap {
      case QueueOfferResult.Enqueued => responsePromise.future
      case QueueOfferResult.Dropped =>
        withRetry()(queueRequest(request))

      case QueueOfferResult.Failure(ex) =>
        logger.error(s"Error processing request ${request.method.name()} - ${request.getUri()}. Retrying.", ex)
        withRetry()(queueRequest(request))

      case QueueOfferResult.QueueClosed => Future.failed(new RuntimeException("Queue was closed (pool shut down) while running the request. Try again later."))
    }
  }

In my case timing consists of time in queue and request-response time


(Johan Andrén) #6

Alright, that makes it a bit more clear :)

The client pool is by default configured to not do more than 4 connections per host, which is likely what you are seeing judging from the timing logs which looks like they are the ~620ms server times plus some network overhead in batches of four, you can change it using akka.http.host-connection-pool.max-connections.

In general I’d recommend using mapAsync + Http.singleRequest over basically reimplementing those two yourself, just for simplicity.


(Pavel Tsipinio) #7

Thanks. I tried mapAsync with Http.singleRequest. The problem is that single requests creates pool as well and it takes some time. I need to process many requests in a row as fast as possible.

I’ve increased akka.http.host-connection-pool.max-connections. It helped a bit. I thought that probably there are some tricks to improve processing more.