State inside of flow operators?

Is mutable state allowed inside of flow operators, as long as the state does not escape the operator?

  Source(1 to 100)
      .async // let's introduce threat-safety issues
      .map {
        var mutableNumber = 0 // mutated whenever an element is received
        item => {
          mutableNumber += item // non-atomic operation
          mutableNumber
        }
      }
      .runForeach(println)

I tried as hard as I could to create a race condition using async and Thread.sleep in-between operations, but it appeared impossible - which would be great. As I understand it, operators run inside of actors, so I presume they provide the same single-threaded illusion. Here is an example of such an attempt to create a race condition:

  Source(1 to 100)
      .async
      .map {
        @volatile var mustBeFalse = false
        item => {
          if (mustBeFalse) {
            // this never happens
            println("Oh no, race condition.")
            throw new IllegalStateException()
          }
          mustBeFalse = true
          Thread.sleep(20)
          mustBeFalse = false
          item
        }
      }
      .async // just in case
      .runForeach(println)
1 Like

No need to withdraw: it is quite important to note that stream combinators process elements in a strictly sequential fashion. This is — as you guessed — ensured by being implemented on top of actors. Parallelism is possible, but it is always explicit, by the use of the Future type (as in mapAsync and friends).

1 Like

One problem is when you materialize the stream more than one time, then the same lambda (and the mutable state) will be unsafely shared between those instances.

Consider the following:

   val stream = Source(0 to 10)
      .map {
        println("creating one lambda with a counter")
        var counter = 0

        { (n: Int) =>
          counter = counter + 1
          println(s"counter increased: $counter")
          n
        }
      }.to(Sink.ignore)

  stream.run()
  stream.run()
  stream.run()
2 Likes

and that is why statefulMapConcat and several other operators takes a factory of a function

Great, thanks a lot for the answers - I can see now what I have to watch out for.

Personally, design wise, I think the flow operators would be a great place to use Scala’s v: => TypeName syntax (by-name parameters) which would cause re-evaluation of any expression upon stream materialization. The operators already provide Actor-like single threaded illusion, might as well use it ;)

I can hear the purist’s complaint that by-name parameters ares not explicit, but I don’t think that this would lead to unexpected behavior. The reason is that in the explicit format you cannot have mutable state in the operators across multiple materialization. For this reason changing to : => won’t break existing implementations, because that change would make no difference for stateless operations, or stateful operations that are only materialized once (the currently allowed cases). You could say, it’s not breaking the Liskov Substitution Principle.

All it does is making mutable state on a per-operator basis possible, analogous to an Actor Behavior with the single-threaded illusion. I believe this is the intuitive behavior.

Edit:

A good small-scale sample of this is also https://doc.akka.io/docs/akka/current/stream/operators/Source-or-Flow/fold.html, the red box saying

Warning: Note that the zero value must be immutable […]

With the : => operator, such a warning would not be necessary and the behavior would be as intuitively expected.

PS: I expect that there are probably good reasons why this hasn’t been done, but I’m still throwing this out for possible consideration.

I’ve opened an issue suggesting this: https://github.com/akka/akka/issues/28474

I don’t expect that this will/can be done, but at the same time I feel like it has some merit. Personally I miss this a lot, since I end up implementing my own GraphStage for most things that contain mutable state, unless statefulMapConcat is a better fit (I wasn’t sure where the boundaries were, so I defaulted to “no state” in flow operations apart from statefulMapConcat).

I follow this topic from the beginning and still don’t get it…

If you want a “state for every materialization” you have the .statefulMapConcat.
If you want a “state globally in your app” you still can use a handmade actor and .ask.

I think the idea that you write var anywhere in your scala code besides actors or really low level runtime critical functions is “bad” by default. (But you can use recursion and akka-typed actor-behaviour so you can skip the var part all together!)

I follow this topic from the beginning and still don’t get it…

I’ve come across this need many times and usually end up writing my own GraphStage. Many problems cannot be solved with statefulMapConcat, or end up ugly. For example, what would be your cleaner solution to the following post from a couple of days ago? (note the var lastKey)

What is the problem with this?

Source(items)
    .throttle(1, 1.second)
    .statefulMapConcat{ () => {
      var lastKey: Option[String] = None
      item:Groupygroup  =>
        lastKey match {
          case Some(item.key) | None =>
            lastKey = Some(item.key)
            (item -> false) :: Nil
          case _ =>
            lastKey = Some(item.key)
            (item -> true) :: Nil
        }
    }}
    .splitWhen(SubstreamCancelStrategy.drain) {
      _._2
    }
    .map(_._1)
    .fold(Vector.empty[Groupygroup])(_ :+ _)
    .mergeSubstreams
    .runForeach(println)

The big difference between this one and the one you provided that it has a state for every materialization (bcs of the () => A => List[B] param) while yours will probably create one single shared param for every materialization. (So for example if it would be just a sink for a tcp or websocket stream, it would be shared all accross the clients (without threadsafety).)

You can changed the statefulMapoConcat to an ask if your decider-actor returns with an (A,Boolean) (or a wrapper), and you instantly get an appwise global state with threadsafety. (You probably can have problems with the build in buffers, but for most cases it will be fine.)

1 Like

What is the problem with this?

Thanks for giving your approach. It’s a good solution in light of the current limitations. It consists of pre-computing the required state for splitWhen in a previous stage (statefulMapConcat) and then forward-propagating that state to splitWhen, followed by a removal of that state from the stream after it has been used. It’s a roundabout way to give splitWhen access to the state in statefulMapConcat. This stands in contrast to simply having the state in splitWhen itself, which, as you note, is currently not supported.

This approach would have to be applied to all non-statefulMapConcat operators that require some form of state: Pre-compute in statefulMapConcat, forward-pass, apply, remove.

I believe it would be cleaner if all operators would support multiple materializations with By-Name parameters - analogous to what you expect when you spawn multiple instances of the same actor behavior.

Personally, unless statefulMapConcat is exactly what I need, I would always end up writing my own GraphStage instead.

The big difference between this one and the one you provided that it has a state for every materialization while yours will probably create one single shared param for every materialization.

That’s exactly the “limitation” that I’m addressing, correct. It’s a design choice. By-Name parameters could solve this.

Yeah, now I understand what is your “main” problem; it’s not intuitive at the beginning :D I remember I faced it too. I tried to handle a “global” state in a custom build graph by switching a flag in one stage to modify the behaviour of a filter in an another stage.
The reason why the “global” thing comes in is bcs of the cached graph building mechanism. A graph only builds when its “materialized” so basicly you need to provide a lazy function to make it “variable safe” when you provide the pieces. This is the reason why the statefulMapConcat has its somewhat ugly { () => {asd: A => ???}} kind of body. But I think, when you start to complicate things it is the behaviour that can save you from a lot of issue too. (Also the java interop would be veary strange if the scala part would work with the lazy f: => A => B but the java part not, or it would be even worse if the java syntax would be forced to an () => A => B everywhere.)

BTW.: I’m playing with akka streams about 3 years from now. Almost every time (or at least at the first 2 years) when I maded a custom stage it was bcs I didn’t know that there is an already written stage which combined with other stages can achive the same output without the possibility that I mess it up big-time. Half of the cases when I wrote a custom stage it has a big edge-case which I missed in my first tests and I needed to go back and fix the initial implementation, after we found a minimal reproduceing test. So pls when you write a custom stage, at least be suspicious that maybe you can achive the same functionality without it :P

1 Like

I didn’t know that there is an already written stage which combined with other stages can achive the same output without the possibility that I mess it up big-time

Yes, I can relate.

Now, I just analysed a collection of 14 custom GraphStages for a more objective picture and the result is as follows:

  • 11 times it was statefulMap not statefulMapConcat that I needed. (Often I ended up re-using my own StatefulMap implementation, I’m counting those in)
  • 2 times I could have used fold with statefulMapConcat
  • 1 time it was warranted, for an advanced FanInShape2 that pushes and pulls elements from two inlets statefully with internal logic

A valid argument could be made that statefulMap is all that I need. While that’s true, I think there are some other operators (like splitWhen) that would benefit equally from a stateful alternative, it’s just that my particular use-case doesn’t rely on them much. Nevertheless, I think the set of operators that would benefit from a stateful version is very small.

Due to the default materialization scheme that places all combinators within a single actor (unless specified otherwise with .async) there is no significant overhead in using two operators, or said the other way around, there is no need to have a single operator for any possible thing you might want to do. Answering your concrete cases:

  • .statefulMap is easily emulated by emitting single-element lists from .statefulMapConcat
  • stateful .splitWhen is emulated by .statefulMapConcat computing a single-element List[(Bool, T)], splitting on the Bool and following it up with a .map(_._2)

Only after you have measured that these approaches are too slow (which applies only to a very tiny sliver of the problem space) should single-stage optimization be attempted. In my experience from implementing the standard operators I can say that a “trivial” combinator typically has 2–3 non-trivial signal flow bugs in its GraphStage callback implementation — the only good code you can write is code that you don’t write. GraphStage should mostly be reached for when the nature or topology of the desired element flow is not yet implementable with standard operators.

Reading the above, this thread has pivoted a little; I guess a fair summary of the initial point is that state handling is safe as long as the functions supplied to the Akka Streams API are all pure (in particular not closing over mutable references). Stateful combinators exist: they intentionally make it painfully obvious what is going on (e.g. fold/scan/reduce, expand/conflate, statefulMapConcat, … ). Adding support for impure functions — as proposed with the by-name syntax — would be counter to this goal, and it would also be leaky, because pure functions are the only ones that the API can make any guarantees for.

Regards,

Roland