Wrapping expensive async calls in Akka Streams in a cache

Is there an idiomatic way to wrap expensive async operation in Akka streams application in a (LRU, LFU) cache?

I have a fairly simple stream; my data comes from Kafka, some transformations and filtering need to be done on it, and it’s being dumped to Kafka again.

final case class Data(key: Key, values: Values)

My issue is that the in order to transform the data, I need to query an external service with the key being some part of the element which passes through the stream. We can represent that with a function like this:

def withRules(data: Data): Future[(Data, Rules)]

That would allow me to hook this up in a stream like this

...
  .mapAsync(parallelism)(findRules)
  .map(applyRules)
...

I have a contract with external service; it says that data which has the same key have the same rules which do not change for a certain key.

I would like to leverage that knowledge in such a way that I only perform 1 external call and cache the result. It does not need to be backed by any persistence method, just an in memory one. In order to stop the cache growing as time (and data) goes by, some sort of eviction would be nice.

The only idea I had was to create an actor which would wrap the service and act as a cache, and I’d then ask that actor for the rules.

val cachedServiceActor
...
  .mapAsync(parallelism)(data => cachedServiceActor.ask(EnrichWithRules(data))
  .map(applyRules)
...

However, I see problem with that, and that’s the fact that actor processes messages one by one, meaning I’ve lost my parallelism with this solution.

Could someone shed some light on my predicament? :smiley:

I’ve come up with a possible solution using LfuCache from Akka-Http-Caching.

class CachingActor(lookupActor: ActorRef) extends Actor {

  import akka.pattern.{ask, pipe}

  private implicit val actorSystem: ActorSystem = context.system
  private implicit val askTimeout: Timeout = Timeout(5.seconds)
  private implicit val ec: ExecutionContext = context.dispatcher

  val defaultCachingSettings: CachingSettings = CachingSettings(context.system)

  val lfuCacheSettings: LfuCacheSettings = defaultCachingSettings.lfuCacheSettings
    .withInitialCapacity(128)
    .withMaxCapacity(256)
    .withTimeToLive(5.minutes)
    .withTimeToIdle(1.minute)

  val cachingSettings: CachingSettings =
    defaultCachingSettings.withLfuCacheSettings(lfuCacheSettings)

  val lfuCache: Cache[LookupKey, ExpensiveValue] = LfuCache[LookupKey, ExpensiveValue]

  override def receive: Receive = {
    case FindByKey(lookupKey) =>
      lfuCache
        .getOrLoad(
          lookupKey,
          delegateLoad
        )
        .pipeTo(sender())
  }

  private val delegateLoad: LookupKey => Future[ExpensiveValue] =
    lookupKey => (lookupActor ? FindByKey(lookupKey)).asInstanceOf[Future[ExpensiveValue]]
}

Would something like this be an OK approach?

No, you can have parallelism with only one actor. It is correct that an actor will only process one message at a time, but not an actor can only start working on one thing when another is completed.

Say, for example, your actor receives a command to obtain an expensive result asynchronously. The actor then starts retrieving the result, in the form of a future. But instead of waiting for the future to complete (blocking), the actor uses the pipeTo pattern to pipe the result to itself. It is then free to receive new commands in the meantime, and process multiple expensive requests in parallel. It will receive the results as the futures are completed, and you can do with it what you want - for example, return the result to the original requester/sender.

If the expensive operation does not come in the form of a future, but, for example, is blocking, then you simply wrap it in a future yourself.

Your approach of having an actor to manage the cache (which you can then plug) is good design in my opinion, and viable as well.

Did that make sense?

1 Like

Yeah, it does make sense, thank you.

However, I’m not sure how I’d include that approach in my CachingActor.

Do you have any advice on that as well?

Looks like you are already doing it - apparently lfuCache.getOrLoad already returns a future, which you then pipe to the sender. So the actor becomes ready for a new message immediately after receiving a FindByKey command while getOrLoad is processed on a different thread, whose result is then piped. So your actor already supports, say, “unbounded” parallelism.

1 Like

Ah, I thought I would need to split that logic somehow, sending “private” messages to itself, and then sending it to the original sender.

Thank you very much for your input!