Combining Streams with disparate types

Apologies for the bad title, but not sure how to succinctly describe the question.

I have an unlimited Source[A, NotUsed] and a limited Source[B, NotUsed] where the latter is basically just a future that will return a list of B. I want to use the List[B] in logic to filter values in Source[A]

def getFilterList(filterCriteria): Future[Seq[Int]] = ???

def filterStream(filterCriteria: Int): Source[C, NotUsed] = {
  val mainSource: Source[A, NotUsed] = getMySource
  val filterSource: Source[Int, NotUsed] = Source.future(getFilterList(filterCriteria))

  // if List[A] contains a B, for example, convert B -> C in the output stream; otherwise, drop it

Can I combine these two sources in a way that accomplishes this?


Do you expect to fetch a (new) list of Bs to compare with for each element A or just one list of B at the start of the stream of A:s and then compare each A with that same list?

Good question - should have made that clear. I am wondering how to do it either way:

  1. Retrieve the List[B] once and apply to each item in stream A
  2. Retrieve List[B] for every instance of A

For 1 there’s basically two options - delay starting the stream until the Bs are there, or start the stream right away and compose with the B. Easiest and with least overhead is the delaying the stream until the future completes:

val futureBs: Future[List[B]] = getBs
val delayedUntilBsReady: Source[A, NotUsed] = 
  Source.futureSource( => 
    // once bs are ready, create stream of As 
    getMySource.filter(a => bs.something(a))))

More about Source.futureSource in the docs

For nr 2 fetching a new list of Bs for each, one way would be:

getMySource.mapAsync(parallelism = 4)(a => getBs().map(bs => (a -> bs))
  .filter { case (a, bs) => bs.something(a) }

More about mapAsync in the docs

Thank you!