Shutdown actor system with blocking calls

Hi Akka Team in our project we are facing issues when shutting down the actor system
which has threads blocked in await.

I know that i should avoid using blocking calls inside the actor system but unfortunately, it is not always possible, esp. in hosted env like Flink, Spark, etc, which already provide an abstraction to run distributed computations.

In my particular case we have a Flink pipeline where we are using Akka/Akka HTTP library to fetch data from the backend.
Flink stages (except (Rich)AsyncOperator) usually require a message to be passed through the pipeline which force us to block on await, i.e.

    Await.result(Http()(system).singleRequest(request), Duration.Inf)

The problem comes when there is an exception within Flink pipeline and Flink tries to restarts the job. Initially, Flink tries to gracefully shutdown the pipeline operators, keeping JVM running, but if there are blocking calls usually Actor system is not actually shutting down cleanly.

Issues become even worse when Flink times out waiting for the operator`s completion and starts removing temporary files that were used to run the job still running actor system threads start logging millions of error messages of NoClassDefFound and keep CPUs busy.

I created a small demonstration code which will never shutdown the actor system properly:


object AppConfig {
  val actorSystemConfig = ConfigFactory.parseString(
    """
      akka {
        loglevel = DEBUG
        loggers = ["akka.event.slf4j.Slf4jLogger"]
     }
    """
  )
}

object ReproducerApp extends App {

  implicit val system = ActorSystem("actor-system", AppConfig.actorSystemConfig)
  import system.dispatcher

  val exceptionHandler: ExceptionHandler = ExceptionHandler {
    case ex: Throwable =>
      println(s"Got exception: ${ex.getMessage}")
      complete(StatusCodes.OK)
  }

  val route: Route =
    handleExceptions(exceptionHandler) {
      path("test") {
        println("Got request")
        withoutRequestTimeout {
          val promise = Promise[String]()

          system.scheduler.scheduleOnce(10.seconds) {
            promise.success("OK")
          }

          complete {
            promise.future
          }
        }
      }
    }

  val bindingFuture = Http().newServerAt("127.0.0.1", 8080).bind(route)
  System.in.read()
}

object ReproducerClientApp extends App {
  implicit val system = ActorSystem("actor-system", AppConfig.actorSystemConfig)
  val request =
    HttpRequest(
      method = HttpMethods.GET,
      uri    = Uri("http://127.0.0.1:8080/test")
    )

  Source.fromIterator(() => (1 to 100).toList.iterator)
    .map { _ =>
      Await.result(Http()(system).singleRequest(request), Duration.Inf)
    }.map(_.discardEntityBytes()).runWith(Sink.ignore)

  Thread.sleep(4000)

  val result = system.terminate()

  result.andThen {
    case s => println(s)
  }(ExecutionContext.global)

}

ReproducerClientApp code will never terminate the actor system as there are some threads blocked but there will be no new task scheduled to deliver a message to waiting Akka HTTP stage, the default/internal dispatcher threads are still running. Alive actors blocking akka dispatchers to shut down - so it is kind of deadlock.

As a work aroud i tried to send interrupt to all threads like this:

  val threadSet = Thread.getAllStackTraces.keySet
  threadSet.asScala.foreach { t =>
    t.interrupt()
  }

This did a trick and actor system terminates logging error:

[2020-08-17 10:59:00.336 +0000] ERROR - akka://actor-system/system/Materializers/StreamSupervisor-0/flow-0-0-ignoreSink: interruption during creation
akka.actor.ActorInitializationException: akka://actor-system/system/Materializers/StreamSupervisor-0/flow-0-0-ignoreSink: interruption during creation
	at akka.actor.ActorInitializationException$.apply(Actor.scala:196)
...
Caused by: java.lang.InterruptedException: null
	at java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1040)
	at java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1345)
	at scala.concurrent.impl.Promise$DefaultPromise.tryAwait0(Promise.scala:207)
	at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:225)

I looked into akka shutdown process, which is already quite complicated, but do you think having some build-in mechanism to force shutdown by interrupt signal after some timeout would make sense? Or maybe you have other suggestions in this situation?

Regards,
Kyrylo

1 Like

I think that would be difficult to implement in a correct and reliable way. Probably better for applications that have this need to implement some custom interrupt handling for blocking tasks. Could be triggered by a custom task in Akka’s CoordinatedShutdown.

You could try abort instead of terminate, but I have low expectations that it will help.

system.asInstanceOf[akka.actor.ExtendedActorSystem].abort()

Thanks @patriknw for a suggestion, but abort is not solving this issue.

For sure we can add custom logic to send interrupt to all threads, probably proper way probably would be to use shutdownNow on ExecutorService in this case. (while akka uses simple shutdown).

public interface ExecutorService extends Executor {

    /**
     * Attempts to stop all actively executing tasks, halts the
     * processing of waiting tasks, and returns a list of the tasks
     * that were awaiting execution.
     *
     * <p>This method does not wait for actively executing tasks to
     * terminate.  Use {@link #awaitTermination awaitTermination} to
     * do that.
     *
     * <p>There are no guarantees beyond best-effort attempts to stop
     * processing actively executing tasks.  For example, typical
     * implementations will cancel via {@link Thread#interrupt}, so any
     * task that fails to respond to interrupts may never terminate.
     *
     * @return list of tasks that never commenced execution
     * @throws SecurityException if a security manager exists and
     *         shutting down this ExecutorService may manipulate
     *         threads that the caller is not permitted to modify
     *         because it does not hold {@link
     *         java.lang.RuntimePermission}{@code ("modifyThread")},
     *         or the security manager's {@code checkAccess} method
     *         denies access.
     */
    List<Runnable> shutdownNow();

Is there any way how i can get access to executor service which is used inside all dispatchers?

Regards,
Kyrylo

I wouldn’t shutdown all dispatchers this way. Since it’s blocking tasks those should run on dedicated dispatchers anyway, and those specific dispatchers could be shutdown like this.

There is no public API for accessing the executor service but it can be done like this:

scala> val d = system.asInstanceOf[ExtendedActorSystem].dispatchers.lookup("akka.actor.default-blocking-io-dispatcher")
d: akka.dispatch.MessageDispatcher = Dispatcher[akka.actor.default-blocking-io-dispatcher]

scala> val d2 = d.asInstanceOf[akka.dispatch.Dispatcher]
d2: akka.dispatch.Dispatcher = Dispatcher[akka.actor.default-blocking-io-dispatcher]

scala> val m = classOf[akka.dispatch.Dispatcher].getDeclaredMethod("executorService")
m: java.lang.reflect.Method = public final akka.dispatch.ExecutorServiceDelegate akka.dispatch.Dispatcher.executorService()

scala> val executorService = m.invoke(d2).asInstanceOf[java.util.concurrent.ExecutorService]
executorService: java.util.concurrent.ExecutorService = akka.dispatch.Dispatcher$LazyExecutorServiceDelegate@50af4885

scala> executorService.shutdownNow()