Mutable State in conflateWithSeed functions

I understand that conflateWithSeed was meant to be used with a fold-style pure function application, but I was wondering whether it is safe to use a mutable data structure to accumulate state like in this example below.

The function aggregate is being called by different threads, but I assume memory barriers are applied by Akka when different threads execute the Actor logic.


  implicit val sys = ActorSystem()
  implicit val mat = ActorMaterializer()

  def seed(i: Int): mutable.LinkedHashMap[Int, Int] = mutable.LinkedHashMap[Int, Int](i -> 1)

  def aggregate(state: mutable.LinkedHashMap[Int, Int], i: Int): mutable.LinkedHashMap[Int, Int] = {
    state.put(i, state.getOrElseUpdate(i, 0) + 1)
    state
  }

  Source(1 to 10000)
    .map(_ => Random.nextInt(100))
    .conflateWithSeed(seed)(aggregate)
    .throttle(1, 1.second)
    .runForeach(println)
}
2 Likes

Thinking about this a bit more, this is ok as we are not closing over a mutable state, but injecting it with a seed. The state is neither written nor read outside of the conflation stage.

See State inside of flow operators?

In short, mutable state is only possible if there is only one materialization of the flow, except for .statefulMapConcat.

Thanks for the reply. I’ve seen that thread, but I don’t think that rule applies here as the lambdas are not closing over any mutable state, hence can be shared.

It should work fine as long as you are not materializing multiple times, I’d argue though that the risk that someone comes along and changes the working code and causes that to happen is relatively high and is a good enough reason to try to avoid it (wrapping some part of it with some form of restart or just re-using the stream blueprint since that is generally safe).

One way you could make it foolproof is by nesting it in a Flow.lazyFlow since that lazy-factory-function will be called once per materialization ensuring you get a new instance of your mutable collection for each stream started from the blueprint.

I agree on the principle of staying pure, this question is more for special cases where we are worried about performance. Thanks for the lazyFlow idea, however I don’t see how this could cause an issue even with multiple materialisations. Could you please elaborate on it maybe with an example?

Note that seed creates a new container for aggregation and aggregate does not close over any mutable reference, but passes back what it got after mutating it.

I don’t see how this could cause an issue even with multiple materialisations. Could you please elaborate on it maybe with an example?

No, you are right, because conflateWithSeed will actually invoke the seed every time a new one is needed. It does not work like fold for example which would end up sharing the same mutable zero value across materializations.

It is perfectly safe to do as you do.

Sorry about the incorrect info above, there’s so many operators to keep track of!

Ok, thanks. It’s good to keep in mind that we can’t do this with all operators.