Grouping and merging sorted Streams

Hello,

Given a type A that has a property A.location, I have some streams that emit objects of type A such that A.location is always in ascending order, but there may be some consecutive objects that have the same location, and ordering between objects with the same location is undefined.

Given a stream that emits objects of type A, say Source[A, Mat1], how can I group consecutive objects with the same location, say Source[Set[A], Mat2]?

Given multiple streams of A, say Seq[Source[A, Mat1]], I want a new stream that emits collections, such that each collection contains all objects with the same location, but in such a way, that I can still find, which object came from which stream. So, it couldn’t just be Set[A], because then I wouldn’t know which object came from which stream. I need, for example, something like Source[Seq[Set[A]], Mat3], where the first element is the set of objects from the first stream and so on.

Thanks!

For the first one, you can play with something like this:

type Data = String
val dummyData: Data = "this is not a valid element"
def looksSame(d1: Data, d2: Data): Boolean = d1 == d2
val list: List[Data] = "a" :: "a" :: "b" :: "b" :: "b" :: "c" :: "d" :: Nil
Source(list)
  .concat(Source.single(dummyData))
  .statefulMapConcat { () => {
    var actualGroup: List[Data] = Nil
    val fun =  (element: Data) =>
      actualGroup match {
        case Nil =>
          actualGroup = element :: actualGroup
          List.empty[Data]
        case h :: _ if looksSame(h, element) =>
          actualGroup = element :: actualGroup
          List.empty[Data]
        case _ =>
          val ret = actualGroup
          actualGroup = element :: Nil
          ret.reverse :: Nil
      }
    fun
  }
  }.runForeach(println)

The basic idea is storing the elements (in a list for example), and when a “location” came that is different than the ones we aggregating, we emit the aggregated elements (its retval is a List[List[String]] in my example which will emit List[String]s), and start to store the new “location”.
With this method the last element will “stuck” in the state, so will need to push it out with a nonexisting “location” which will be stay in the state when the stream completes.

For the second:

type From = String
type Data = String
val sources: Seq[(From, Source[Data, NotUsed])] = ???

val combined: Source[(From, Data), NotUsed] = Source(sources.toList)
    .flatMapConcat { case (f, s) =>
      s.map(f -> _)
    }

Based on your use-case you may want flatMapMerge instead of flatMapConcat.

Awesome, thank you!