Mutate an actor's internal state after a Future completes, in another run of receive?

When writing a websocket server with akka-actor, I found that there is a need (and possibly a pattern) to create a Future when handling a message, and mutate actor’s internal state after the Future completes.

My current (hopefully valid) solution is to send the actor itself another message ( ScheduledComputation ), and mutate own state in a future run of receive ( handleScheduledComputation ). The detail can be found in attached code.

In this way I can write conceptually consecutive computations together (like in handleUserMessage ), but as I’m new to akka and actors, I feel insecure to use it before hearing from others (the thing I wanted to do may be an anti-pattern after all).

Any comments are welcome.

// ActorFutureRunnableSupport.scala
import akka.actor.Actor

import scala.concurrent.{Future, Promise}
import scala.util.Try

private case class ScheduledComputation[A](createdIn: Actor, computation: () => Try[A]) {

  private val executed    = Promise[A]
  private lazy val result = computation()

  /**
    * MUST be executed within another receive() run of the same actor
    * MUST execute only once
    *
    * @param runIn
    */
  def run(runIn: Actor): Unit = {
    if (createdIn ne runIn) {
      throw new IllegalThreadStateException(
        s"Continuation is created in $createdIn but executed in $runIn")
    }
    executed.complete(result) // would throw if already complete-d
  }

  val future: Future[A] = executed.future
}

trait ActorFutureRunnableSupport extends Actor {
  protected def handleScheduledComputation: Receive = {
    case c: ScheduledComputation[Any] =>
      c.run(this)
  }

  protected def runInNextTick[A](computation: () => Try[A]): Future[A] = {
    val cont: ScheduledComputation[A] = ScheduledComputation(this, computation)
    self ! cont
    cont.future // completes after cont.run called by handleContinuation
  }
}
// ExampleActor.scala
import java.util.concurrent.TimeUnit

import akka.actor.{Actor, ActorRef}
import akka.pattern.{ask, pipe}
import akka.util.Timeout
import io.jokester.util.akka.actor.ActorFutureRunnableSupport

import scala.concurrent.{ExecutionContext, Future}
import scala.util.Success

case class UserMessage()
case class ServerMessage()

case class ExampleActorState()
case class HandlerActorResponse()

abstract class ExampleActor extends Actor with ActorFutureRunnableSupport {

  var state: ExampleActorState = ExampleActorState()

  def receive: Receive = handleScheduledComputation.orElse(handleUserMessage)

  def handleUserMessage: Receive = {
    case req: UserMessage =>
      implicit val (timeout, ec) = defaultAsyncTimeout()
      val res: Future[ServerMessage] = createHandlerActorIfNotExist() // create a (maybe remote) handler actor if not exist
        .flatMap(handleActor => handleActor ? req)                    // ask (maybe new created) handlerActor
        .mapTo[HandlerActorResponse]
        .flatMap(handled => {
          runInNextTick(() => {
            state = ???                                               // mutate own state after ask completes, in another run of receive()
            Success(ServerMessage())
          })
        })

      pipe(res) to sender
  }

  abstract def createHandlerActorIfNotExist(): Future[ActorRef]

  def defaultAsyncTimeout(): (Timeout, ExecutionContext) = {
    val timeout: Timeout = Timeout(5, TimeUnit.SECONDS)
    val ec               = context.system.dispatcher
    (timeout, ec)
  }
}

A better way, avoiding the risk of closing over mutable state in a closure (for example like trying to touch the state of the actor in flatMap in your example code) is to use the pipe pattern to get a message with the completed (or failed) result of the future.

1 Like