Is there an idiomatic way to wrap expensive async operation in Akka streams application in a (LRU, LFU) cache?
I have a fairly simple stream; my data comes from Kafka, some transformations and filtering need to be done on it, and it’s being dumped to Kafka again.
final case class Data(key: Key, values: Values)
My issue is that the in order to transform the data, I need to query an external service with the key being some part of the element which passes through the stream. We can represent that with a function like this:
def withRules(data: Data): Future[(Data, Rules)]
That would allow me to hook this up in a stream like this
... .mapAsync(parallelism)(findRules) .map(applyRules) ...
I have a contract with external service; it says that data which has the same key have the same rules which do not change for a certain key.
I would like to leverage that knowledge in such a way that I only perform 1 external call and cache the result. It does not need to be backed by any persistence method, just an in memory one. In order to stop the cache growing as time (and data) goes by, some sort of eviction would be nice.
The only idea I had was to create an actor which would wrap the service and act as a cache, and I’d then ask that actor for the rules.
val cachedServiceActor ... .mapAsync(parallelism)(data => cachedServiceActor.ask(EnrichWithRules(data)) .map(applyRules) ...
However, I see problem with that, and that’s the fact that actor processes messages one by one, meaning I’ve lost my parallelism with this solution.
Could someone shed some light on my predicament?