Flow using itself recursively and merge all contents

Hi,

I’m trying to fetch data from GTS database. Consider that this database provides fetch only with using now (which is a timestamp) and n which is the records number limit per request. And in this example I don’t use the limit param for the global fetch yet. But I have a 3 gts batch per request which is a test.

So, I want to chain fetch using batch and from the last fetched batch retrieve the oldest date to fetch the next using the now/limit pattern. I’m trying to do all this stuff using akka stream recursively.

So here is the code:

  def rangedFetch(
    readToken: String,
    query: Query[StartStopRangeMicros],
    limit: Long
  )(
    implicit warpClientContext: WarpClientContext
  ): Flow[Query[StartStopRangeMicros], Seq[GTS], NotUsed] = {
    val uuid = UUID.randomUUID
    Flow[Query[StartStopRangeMicros]]
      .map(query => query.copy(range = RecordsSinceMicros(query.range.newestDate, 3)))
      .map(query => fetchRequest(readToken, query))
      .map(request => (request -> uuid)) // cf. https://doc.akka.io/docs/akka-http/current/client-side/host-level.html
      .via(warpClientContext.poolClientFlow)
      .filter({ case (_, key) => key == uuid })
      .map({ case (responseTry, _) => responseTry })
      .via(processResponseTry)
      .via(byteStringToGTS)
      .fold(Seq[GTS]()) { case (seq, item) => { seq :+ item }}
      .flatMapConcat { gtsSeq: Seq[GTS] =>
        val currentOldestGTS = gtsSeq.last
        val chainedNow = currentOldestGTS.ts.get
        if (chainedNow > query.range.oldestDate) {
          val newQuery = query.copy(range = StartStopRangeMicros(chainedNow, query.range.oldestDate))
          Source.combine(
            Source
              .single(gtsSeq),
            Source
              .single(newQuery)
              .via(rangedFetch(readToken, newQuery, limit))
          )(Concat(_))
        } else {
          Source
            .single(gtsSeq)
        }
      }
  }

The problem is about getting all the content and not only the last result. And using this Combine make the method providing only the last fetch content. Can’t understand why.

Maybe the way how I do this job isn’t the best and I’m open to every possibility…

This is working, but this is so ugly.

 if (chainedNow > query.range.oldestDate) {
          val newQuery = query.copy(range = StartStopRangeMicros(chainedNow, query.range.oldestDate))
          println(newQuery)
          Source
            .single(newQuery)
            .via(rangedFetch(readToken, newQuery, limit))
            .flatMapConcat(gtsSeq2 =>
              Source
                .single(gtsSeq ++ gtsSeq2)
            )
        } else {
          Source
            .single(gtsSeq)
        }

This way is only running the Source concat at the end of all rangedFetch calls so this way is bad because the limit can only be performed after all the fetch and not during the fetches (one by one).