Iteratively create and concat Sources based on previous Source output

I have an HTTP service that streams responses for a maximum time duration. After this timeout is reached, the stream will stop even if all desired data has not yet been retrieved. In order to obtain all the desired data, my application must use the last object returned in the stream to decide if and how to create a new request.

Presently, I have a client side Reactive Streams publisher that needs to be integrated into my Akka Streams based business logic. I am struggling to find a clean way of composing multiple streams of paged data, where one stream depends on the last output of the previous stream, into a single stream of paged data.

The following is an example of what I’m trying to do.

// A simplified response object containing all info needed for this paging algorithm
case class Page(maxTime: Timestamp, size: Int)

// This is the client side API I'd prefer to use, as developing directly in Reactive Streams is painful
def fetchFromServer(startTime: Timestamp, targetPageSize: Int): Stream[Page, NotUsed]

// This is the method the business logic would like to use
def fetch(startTime: Timestamp): Stream[Page, NotUsed] = {
  val targetPageSize = 1000
  // TODO: What do I do here? I know this is completely bogus.
  val source = fetchFromServer(startTime, targetPageSize)
  while (true) {
    val lastPage = source.last
    if (lastPage.size < targetPageSize) break
    source.concat(fetchFromServer(lastPage.maxTime + 1, targetPageSize)
  }
}

Calling the fetchFromServer method will terminate when the server stops sending data (no more data available OR timeout); however, the client side business logic should continue requesting data from the server until Page.size < targetPageSize.

I suspect that using a Materialized Value might provide me with the magic sauce I need here, but I can’t seem to wrap my head around how to do so in a looping or recursive fashion.

I did find this post that is similar: Akka source restart with previous state as parameter

I feel like the proposed response in that post may not work well in my use case due to needing this logic to live inside of a larger Flow that processes the data on the fly.

I believe I found a somewhat crafty (a.k.a. a hack) solution to my question, though I also found some existing code at the Producer level that solves this exact problem. Code reuse FTW.

However, since I’ve seen this question asked a few times (myself included) and I’ve yet to see a solution that works, below are my thoughts. I’m not particularly happy with the fact it is using a Throwable to drive normal business logic, but it should work.

The hack is to wrap your Source in a .statefulMapConcat(...) that can emit either an empty Seq (no-op) or throw a special ContinuePaging that extends Throwable without stack trace but with the needed parameters to query your next page. Then use .recoverWithRetries(...) to catch only that object, perform the next query, rinse and repeat.

DISCLAIMER: The following is pseudocode that was never tested in a real application, and is documented here for academic purposes. Use at your own risk.

// The Throwable used to signal a new query is needed
case class ContinuePaging(maxTime: Timestamp) extends Throwable(null, null, true, false)

// When the stream from the server terminates prematurely, throw ContinuePaging
def fetchFromServer(startTime: Timestamp, targetPageSize: Int): Stream[Page, NotUsed] = {
  Source.fromPublisher(... query the client obtaining the publisher...)
    .map(Right(_))
    .concat(Source.single(Left(Done)))
    .statefulMapConcat(() => new PagingRequiredValidator(targetPageSize))

// This stateful PartialFunction decides if another paging query is needed when the stream terminates
class PagingRequiredValidator(targetPageSize: Int) extends (Either[Done, Page] => Seq[Page]) {
  var lastSeenTimeForPaging: Option[Timestamp] = None
  override def apply(v1: Either[Done, Page]): Page] = v1 match {
    case Right(page) =>
      lastSeenTimeForPaging = getLastSeenTimeForPaging(targetPageSize, page)
      Seq(page)
    case Left(Done) =>
      if (lastSeenTimeForPaging.isDefined) throw ContinuePaging(lastSeenTimeForPaging.get)
      Seq.empty
  }
}

// Returns Some only if there might be pages after the given page 
def getLastSeenTimeForPaging(page: Page): Option[Timestamp] = ???

// This is the method the business logic would like to use
def fetch(startTime: Timestamp): Stream[Page, NotUsed] = {
  val targetPageSize = 1000
  fetchFromServer(startTime, targetPageSize).recoverWithRetries(Integer.MAX_VALUE, continuePaging(targetPageSize))
}

// The retry function that executes new paging requests to concat to the original stream
def continuePaging(targetPageSize: Int): PartialFunction[Throwable, Source[Page, NotUsed]] = {
  case ContinuePaging(lastTime) => fetchFromServer(lastTime + 1, targetPageSize))
}

This is a rather complicated solution for what feels like something that should be relatively easy to accomplish. Plus, the use of a Throwable to drive this paging logic reeks of code smell. Every time I’ve wanted to leverage a pattern like this in the past, I’ve ultimately approach the problem from a different direction. In this case, I’ve been able to leverage complexity spent elsewhere, but I’d still like to figure out something more idiomatic to Akka streams.

Hi… the solution needs to be scalable - it would be dealing with a live connection and thousands of records, so hard-coding IDs and combinations isn’t really an option!
At the moment, it seems as though doing the concatenation at the Custom SQL Query level is the best (only?!) conceivable approach, so it may be a case of finding the correct methodology there. Will keep digging!

I wouldn’t be so sure this proposed solution can’t scale. Note how the pseudo code uses a custom Throwable that does not fill the stack trace. This should allow the Throwable to be created without the same overhead traditionally associated with Exceptions. However, this approach is still untested, so I cannot say with the utmost confidence that it will work as expected.

If you are talking about paging at the DB level, I’d expect my proposed approach should work. The retry is used to relay only the information needed to create the next DB query from a paging perspective. That said, we’ve used Flow.unfoldAsync quite effectively for this use case. The hard part is optimizing your paging algorithm.

In my experience, each DB query should be crafted to execute extremely fast, and each paging query should be non-overlapping while still returning 100% of the desired results (i.e. no gaps between pages). Overlapping results requires more filtering code on the application side, which is doable but more complicated… we do it. We found that streaming directly from the DB did not scale well (slow queries == both long lived transactions and resource starvation), and how you efficiently page is dependent upon the DB. For example, Postgres’s limit and offset for paging performs horribly. If I’m reading our graphs correctly, we are performing up to 2,000 requests per second and returning up to 80,000 records/second with capacity to spare. We have 1 DB server with over 700 million rows in the target table (and growing), 3 app servers each having 10 DB connections and servicing upwards of 300 requests per second each.

Note that the paging behavior I’ve described here should work essentially the same for both the unfoldAsync approach as well as this recoverWithRetries approach. The difference is in the mechanics of executing the many paged queries and wiring their results into a single stream of results.