Is there any proper way to gracefully shutdown Source.queue?

Hi,
I have a Akka Http Request Queue as below and try to gracefully shutdown it. I have tried many ways but couldn’t find any good solution yet. I have made a solution with AtomicInteger’s in Promises.onComplete method, and it works fine. But i think, there should be an Akka way to do it.

Here is the sample code that i try to make it work:

  private val (sourceQueue, killSwitch) = createSourceQueue(connectionPool)

  private def createSourceQueue(connectionPool: Flow[(HttpRequest, Promise[HttpResponse]), (Try[HttpResponse], Promise[HttpResponse]), Http.HostConnectionPool]): (SourceQueueWithComplete[(HttpRequest, Promise[HttpResponse])], UniqueKillSwitch) = {
    Source.queue[(HttpRequest, Promise[HttpResponse])](MAX_QUEUE_SIZE, OverflowStrategy.dropNew)
      .via(connectionPool)
      .viaMat(KillSwitches.single)(Keep.both)
      .toMat(Sink.foreach({
        case (Success(resp), p) =>
          p.success(resp)
        case (Failure(e), p) =>
          p.failure(e)
      }))(Keep.left)
      .run()
  }

  def shutdown(timeout: FiniteDuration) {
    try {
      killSwitch.shutdown()
      // 
      // Following code does not awaits queue completion!
      //
      // + `watchCompletion` does not return real completion future.
      // + Also i have tried with `done` future result of `run()` method.
      Await.result(sourceQueue.watchCompletion(), timeout)
    } finally {
      system.terminate()
    }
  }

The queue materializes a SourceQueueWithComplete, which you can .offer() elements to, but which you can also actively .complete(). Is that what you are looking for?

Hi Arnout,

Actually i have tried before, and also didn’t work :frowning:

Here is the complete sample code that i use:

AkkaHttpClient.scala

import java.net.URI
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{ContentTypes, HttpMethods, _}
import akka.stream.scaladsl.{Flow, Keep, Sink, Source, SourceQueueWithComplete}
import akka.stream.{ActorMaterializer, OverflowStrategy, QueueOfferResult}

import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{Await, ExecutionContextExecutor, Future, Promise}
import scala.util.{Failure, Success, Try}

class AkkaHttpClient(uri: String, successPredicate: HttpResponse => Boolean) {
  var retryable = true
  var maxRetryAttempts = 3

  private val MAX_QUEUE_SIZE = Int.MaxValue
  private val RUN_FUTURE_FIRST_ATTEMPT = 1

  private val parsedUri = new URI(uri)

  private implicit val system: ActorSystem = ActorSystem()
  private implicit val materializer: ActorMaterializer = ActorMaterializer.create(system)
  private implicit val executionContext: ExecutionContextExecutor = system.dispatcher

  private var terminated = false

  private val connectionPool = createCachedConnectionPool(parsedUri)
  private val sourceQueue = createSourceQueue(connectionPool)

  def submitBody(body: String,
                 method: HttpMethod = HttpMethods.POST,
                 contentType: ContentType.NonBinary = ContentTypes.`application/json`) {
    val request = HttpRequest(method, uri, entity = HttpEntity(contentType, body))

    val responseFuture: Future[HttpResponse] = queueRequest(request)
    if (retryable)
      retryFuture(responseFuture)
  }

  def shutdown(timeout: FiniteDuration) {
    sourceQueue.complete()
    try {
      // Does not awaits :(
      Await.result(sourceQueue.watchCompletion(), timeout)
    } catch {
      case _: Throwable =>
    } finally {
      terminated = true
      system.terminate()
    }
  }

  private def queueRequest(request: HttpRequest): Future[HttpResponse] = {
    val responsePromise = Promise[HttpResponse]()
    sourceQueue.offer(request -> responsePromise).flatMap {
      case QueueOfferResult.Enqueued => responsePromise.future
      case QueueOfferResult.Dropped => Future.failed(new RuntimeException("Queue overflowed. Try again later."))
      case QueueOfferResult.Failure(ex) => Future.failed(ex)
      case QueueOfferResult.QueueClosed => Future.failed(new RuntimeException("Queue was closed (pool shut down) while running the request. Try again later."))
    }
  }

  def retryFuture(target: Future[HttpResponse]): Unit = {
    retryFuture(target, RUN_FUTURE_FIRST_ATTEMPT)
  }

  private def retryFuture(target: Future[HttpResponse], attempt: Int): Future[HttpResponse] = {
    target.recoverWith {
      case _ =>
        if (!terminated && attempt <= maxRetryAttempts) {
          println("retrying..")
          retryFuture(target, attempt + 1)
        }
        else {
          println("failed (again)!")
          Future.failed(new Exception)
        }
    }
  }

  private def createCachedConnectionPool(uri: URI): Flow[(HttpRequest, Promise[HttpResponse]), (Try[HttpResponse], Promise[HttpResponse]), Http.HostConnectionPool] = {
    if (uri.getScheme == "http")
      Http().cachedHostConnectionPool[Promise[HttpResponse]](uri.getHost, if (uri.getPort == -1) 80 else uri.getPort)
    else
      Http().cachedHostConnectionPoolHttps[Promise[HttpResponse]](uri.getHost, if (uri.getPort == -1) 443 else uri.getPort)
  }

  private def createSourceQueue(connectionPool: Flow[(HttpRequest, Promise[HttpResponse]), (Try[HttpResponse], Promise[HttpResponse]), Http.HostConnectionPool])
  : SourceQueueWithComplete[(HttpRequest, Promise[HttpResponse])] = {
    Source.queue[(HttpRequest, Promise[HttpResponse])](MAX_QUEUE_SIZE, OverflowStrategy.dropNew)
      .via(connectionPool)
      .toMat(Sink.foreach({
        case (Success(resp), p) =>
          if (successPredicate.apply(resp)) {
            println("completed!")
            p.success(resp)
          }
          else {
            println("really failed!")
            p.failure(new Exception("REsult is not satisfied: " + resp.toString()))
          }
        case (Failure(e), p) => p.failure(e)
      }))(Keep.left)
      .run()
  }
}

AkkaHttpClientTest.scala

import akka.http.scaladsl.model.StatusCodes
import org.apache.commons.lang3.time.StopWatch
import org.scalatest.FunSuite

import scala.concurrent.duration._

class AkkaHttpClientTest extends FunSuite {
  test("make a few requests!") {
    val stopWatch = StopWatch.createStarted()
    val client = new AkkaHttpClient("https://akka.io", resp => resp.status == StatusCodes.Accepted)
    (1 to 10).foreach(number => {
      client.submitBody(s"test $number!")
    })
    client.shutdown(2.minutes)
    println(s"Time passed: ${stopWatch.getTime} ms")
  }
}

When i execute the test, it does not awaits queue completion :expressionless:

test 1!
test 2!
test 3!
test 4!
test 5!
test 6!
test 7!
test 8!
test 9!
test 10!
really failed!
retrying..
retrying..
retrying..
failed (again)!
really failed!
retrying..
retrying..
retrying..
failed (again)!
really failed!
retrying..
retrying..
retrying..
failed (again)!
really failed!
retrying..
retrying..
retrying..
failed (again)!
really failed!
retrying..
retrying..
retrying..
failed (again)!
Time passed: 3158 ms
really failed!
failed (again)!

Process finished with exit code 0

There are 6 matches of failed! at the log, but there are 10 requests :unamused: Result is random :expressionless:

(edit: For a note, disabling retry mechanism does not change the result)

Ah - indeed, if I’m not mistaken, watchCompletion resolves when the Queue stage of your stream has completed, meaning either all elements have been accepted and passed downstream (or downstream signalled that it did not need any further data). This does not mean downstream actually finished processing the elements.

If you want to wait until all elements you added to the queue were successfully processed, you want to observe this at the end of the stream. For this, you can use the materialized value of your Sink.foreach: replace the Keep.left with Keep.both and use the Future[Done] to find out when the stream has completed all the way to the sink.

Hi Arnout,

I was tried with a killswitch didn’t worked before but now when i try with .complete() method in combination of Sink completion Future[Done], it worked!

Thank you for your time!

2 Likes