GroupWhile on Akka Streams?

Hello,

Is there a way to achieve a groupWhile behaviour on akka-streams ?

I have examined all the existing group/groupWithin/takeWhile etc functions, but I would need to implement a groupWhile function that would have the following characteristics:

  1. Given a sorted stream of records e.g. Read a file that has records sorted based on Id
  2. Create a new Group / Subflow or so and collect records while the record is the same as before (predicate), once we encounter a new Id, emit the aggregation for the previous group and create a new group.

I’m suspecting that the answer to the above is the use of statefulMapConcat, but haven’t figured an implementation as of yet.

Any ideas / recommedations over ** statefulMapConcat** or other solution would be highly appreciated :)

I think you can to do that with groupBy with a fold for the aggregate.

A tricky aspect is figuring out the end of one id and allowing the substream to complete. Perhaps you could achive that using sliding to look at every 2 elements, and then a mapConcat to inject a special end-element for the previous id when the ids don’t match, and then use takeWhile(_ != end) in the substream?

Hi @johanandren,

Thanks for your answer and the recommendations here, I still fail to see how to effectively put everything together. GroupBy would have been ideal, it it wasn’t keeping track of all keys in memory, I basically want to close the group and push to substream/downstream as soon as we encounter a new key.

From what you are describing, you want to aggregate the stream of elements into batches grouped by a key, where each batch ends when the next one starts (eg. the elements with an equal key are in direct sequence of each other)

EDIT: You can ignore all of this and go to my last post. I leave the other other posts here because they may still be helpful in other scenarios.

statefulMapConcat won’t know when the stream ends, so you have to emit a special element , or else the operation will wait forever for the grouping key to change, indicating the termination of a group. If you don’t want to do that (using a termination element), you’ll have to write your own GraphStage Flow implementation, which will be notified of stream termination and can react appropriately https://doc.akka.io/docs/akka/current/stream/stream-customize.html (that’s what I do in such cases). Alternatively may also use groupBy and fold on the substreams, but that has the negative side-effect that aggregated batches only become available once the stream completes.

I’ve written a working example of a statefulMapConcat implementation for you. You’re welcome ;)

import akka.actor.ActorSystem
import akka.stream.scaladsl.Source

object Application extends App {

  implicit val system = ActorSystem("Groupygroup")

  case class Groupygroup(key: String, value: Int)

  val End = Groupygroup("--END--", 0)
  val items = (1 to 3).map(Groupygroup("foo", _)) ++
      (1 to 3).map(Groupygroup("bar", _)) ++
      (1 to 3).map(Groupygroup("baz", _)) :+
      End

  Source(items).statefulMapConcat[Seq[Groupygroup]](() => {
    var batch = List.empty[Groupygroup]
    var lastKey: Option[String] = None
    item =>
      lastKey match {
        case Some(item.key) | None =>
          lastKey = Some(item.key)
          batch ::= item
          Nil
        case _ =>
          lastKey = Some(item.key)
          val result = batch.reverse
          batch = item :: Nil
          result :: Nil
      }
  }).runForeach(println)
}

Result:

List(Groupygroup(foo,1), Groupygroup(foo,2), Groupygroup(foo,3))
List(Groupygroup(bar,1), Groupygroup(bar,2), Groupygroup(bar,3))
List(Groupygroup(baz,1), Groupygroup(baz,2), Groupygroup(baz,3))

Of course you could make the code nicer, with sealed traits etc. This is a starter

GroupBy approach (does not require terminal element, but will only emit batches on upstream completion)

import akka.actor.ActorSystem
import akka.stream.scaladsl.Source

object Application extends App {

  implicit val system = ActorSystem("Groupygroup")

  case class Groupygroup(key: String, value: Int)

  val items = (1 to 3).map(Groupygroup("foo", _)) ++
      (1 to 3).map(Groupygroup("bar", _)) ++
      (1 to 3).map(Groupygroup("baz", _))
  
  Source(items)
      .groupBy(10, _.key)
      .fold(Vector.empty[Groupygroup])(_ :+ _)
      .mergeSubstreams
      .runForeach(println)
}

Result (note batch order is not preseved):

Vector(Groupygroup(bar,1), Groupygroup(bar,2), Groupygroup(bar,3))
Vector(Groupygroup(foo,1), Groupygroup(foo,2), Groupygroup(foo,3))
Vector(Groupygroup(baz,1), Groupygroup(baz,2), Groupygroup(baz,3))

Ok, this is how you can actually do it - no hacks, just a clean solution. Note that I’m throttling the source for demo purposes:

import akka.actor.ActorSystem
import akka.stream.SubstreamCancelStrategy
import akka.stream.scaladsl.Source

import scala.concurrent.duration._

object Application extends App {

  implicit val system = ActorSystem("Groupygroup")

  case class Groupygroup(key: String, value: Int)

  val items = (1 to 3).map(Groupygroup("foo", _)) ++
      (1 to 3).map(Groupygroup("bar", _)) ++
      (1 to 3).map(Groupygroup("baz", _))

  Source(items)
      .throttle(1, 1.second)
      .splitWhen(SubstreamCancelStrategy.drain) {
        var lastKey: Option[String] = None
        item =>
          lastKey match {
            case Some(item.key) | None =>
              lastKey = Some(item.key)
              false
            case _ =>
              lastKey = Some(item.key)
              true
          }
      }
      .fold(Vector.empty[Groupygroup])(_ :+ _)
      .mergeSubstreams
      .runForeach(println)
}

I’ve ran into similar use cases frequently, so I was thinking about this question from time to time today. Usually I ended up writing my own GraphShape, as stated above.

Not sure if there are any reasonable objections to the var in splitWhen, but I don’t think so.

https://doc.akka.io/docs/akka/current/stream/operators/Source-or-Flow/splitWhen.html

1 Like

Hi @ignatius,

Wow, thank you very much for the number of solutions you have provided, you highlighted very well what I’m trying to achieve and the limitations of using groupBy, since it keeps everything in memory and only emits once upstream has completed, which is not ideal.

Thanks for the example on statefulMapConcat, I get to understand how it works and the need for an End Element, I would have thought that it would emit once the upstream ends, or it has a special way to detect this, but hey, the manual end element should work as well.

I will try out the implementation with splitWhen which seems cleaner for my needs.

I have also discovered an implementation from akka-stream-contrib that fullfils this exact need just in case it’s helpful to anyone: https://github.com/akka/akka-stream-contrib/blob/master/src/main/scala/akka/stream/contrib/AccumulateWhileUnchanged.scala

I would need to verify the performance of these solutions with higher volumes of course.