Seq[Future] to streamed (chunked) response

streams

(Boris Capitanu) #1

I have a Seq[Future[JsObject]] that represent several async computations which will eventually produce a JSON object. I would like to be able to stream these JSONs as soon as any computation result is available so that the output follows the JSON lines format (jsonlines.org)

So I basically want something like the following:

val futures: Seq[Future[JsObject]] = ...
val publisher: Publisher[JsObject] = <black magic involving the futures>
Ok.chunked(Source.fromPublisher(publisher))

What I (unsuccessfully) tried (I’m a newbie to Play Iteratees, Enumerator, Enumeratees…etc.) is this:

val futures: Seq[Future[JsObject]] = ...
val enumerators: Seq[Enumerator[JsObject]] = futures.map(_.map(Some(_))).map(Enumerator.generateM(_))
val interleavedEnumerator: Enumerator[JsObject] = Enumerator.interleave(enumerators)
val publisher: Publisher[JsObject] = IterateeStreams.enumeratorToPublisher(interleavedEnumerator)
Ok.chunked(Source.fromPublisher(publisher))

Upon executing, this hangs and does not produce any result… seemingly waiting for an EOF forever.
The idea is that I liked the “interleave” mechanism since it’s supposed to detect when any of the enumerators are ready to produce an element and immediately “pushes” that element to the consuming iteratee.

I could’ve used Futures.sequence(…) to convert that Seq[Future[…]] to a Future[Seq[…]] but that would’ve meant to wait for all the futures to complete before the entire Future completes… which also means that all the results would need to be held in memory at the same time - something that may not be feasible. Instead, I wanted that as soon as any of the futures in the Seq completes, its value to be streamed to the HTTP response pipe immediately.

I have seen several similar questions on StackOverflow but unfortunately none had a working solution.



Thank you in advance for any tips.

-Boris


#2

Hi,
I’m a newbie, but I thought studying this issue would help me learn something and, maybe point you in the right direction. I hope other, more experienced users, will chime in as well. (What I mean is : you should take this reply with a grain of salt).

Regarding the conversion from Seq[Future[JsObject]] to a Source, you don’t need to go through Enumerators (and I’m pretty sure that generateM doesn’t do what you want).

You could instead use the Source.fromIterator operator along with the mapAsync operator (or the mapAsyncUnorderedoperator).

So the way you would do it is :

val futIterator = futures.toIterator
val source: Source[JsObject,_] = Source.fromIterator(() =>futIterator).mapAsyncUnordered(2)(identity)
Ok.chunked(source)

However, this is only part of the story. Depending on how you construct your Futures, you will run into ExecutionContext issues and things won’t happen as you would expect them to.

While the documentation for this issue is here and here, to me a simple example helped me understand the problem.

Consider the following controller :

package controllers
import javax.inject._
import play.api.mvc._
import akka.stream._
import akka.actor.ActorSystem
import play.api.Logger
import scala.concurrent.{ExecutionContext, Future, Promise}

@Singleton
class HomeController @Inject()(cc: ControllerComponents, actorSystem: ActorSystem)
                              (implicit executionContext: ExecutionContext, materializer: Materializer) extends AbstractController(cc) {
def chunkedFromSource = Action {
    Logger.info("In the action")

    val seqFut = Seq(
      Future {
        Logger.info("ZeroS has started")
        Thread.sleep(5000)
        Logger.info("ZeroS has finished")
      },
      Future {
        Logger.info("OneS has started")
        Thread.sleep(100)
        Logger.info("oneS has finished")
      },
      Future {
        Logger.info("twoS has started")
        Thread.sleep(50)
        Logger.info("twoS has finished")
      }
    )

  Logger.info("Just before rendering")

   Ok("hello\n")
}

If you query this controller (I’ve mapped it to test) by doing eg curl -N --raw http://localhost:9000/test
You get the following in the console :

[info] application - In the action
[info] application - Just before rendering
[info] application - ZeroS has started
[info] application - ZeroS has finished
[info] application - OneS has started
[info] application - oneS has finished
[info] application - twoS has started
[info] application - twoS has finished

And ‘hello’ appears only when twoS has finished.

This is not what I expected. But it makes sense when you read the aforementioned documentation. So what happens when we bring in our own execution context (note that this is not the way to do it, it’s just for the demo):

package controllers
import javax.inject._
import play.api.mvc._
import akka.stream._
import akka.actor.ActorSystem
import play.api.Logger
import scala.concurrent.{ExecutionContext, Future, Promise}

@Singleton
class HomeController @Inject()(cc: ControllerComponents, actorSystem: ActorSystem)
                              (implicit executionContext: ExecutionContext, materializer: Materializer) extends AbstractController(cc) {
def chunkedFromSource = Action {
    val myEc = ExecutionContext.fromExecutor(new ForkJoinPool(4))
    Logger.info("In the action")

    val seqFut = Seq(
      Future {
        Logger.info("ZeroS has started")
        Thread.sleep(5000)
        Logger.info("ZeroS has finished")
      } (myEc),
      Future {
        Logger.info("OneS has started")
        Thread.sleep(100)
        Logger.info("oneS has finished")
      } (myEc),
      Future {
        Logger.info("twoS has started")
        Thread.sleep(50)
        Logger.info("twoS has finished")
      } (myEc)
    )

  Logger.info("Just before rendering")

   Ok("hello\n")
}

Now, ‘hello’ is returned instantly and the console shows :

[info] application - In the action
[info] application - ZeroS has started
[info] application - OneS has started
[info] application - Just before rendering
[info] application - twoS has started
[info] application - twoS has finished
[info] application - oneS has finished
[info] application - ZeroS has finished

Which is what we would expect.

Now going back to the chunked example, if we do this :

package controllers
import javax.inject._
import play.api.mvc._
import akka.stream._
import akka.actor.ActorSystem
import play.api.Logger
import scala.concurrent.{ExecutionContext, Future, Promise}

@Singleton
class HomeController @Inject()(cc: ControllerComponents, actorSystem: ActorSystem)
                              (implicit executionContext: ExecutionContext, materializer: Materializer) extends AbstractController(cc) {
def chunkedFromSource = Action {
    //val myEc = ExecutionContext.fromExecutor(new ForkJoinPool(4))
    Logger.info("In the action")

   val futures = (1 to 10).map { i =>
      Future {
        val sleepTime = Random.nextInt(i * 100)
        Logger.info(s"$i has started execution with $sleepTime")
        Thread.sleep(sleepTime)
        Logger.info(s"$i has finished sleeping")
       
        Json.obj("test" -> s"$i")
      } //(myEc)
    }

  val test = futures.toIterator
  val source: Source[JsObject,_] = Source.fromIterator(() =>test).mapAsyncUnordered(2)(identity)
  
Logger.info("Just before rendering")

   Ok.chunked(source)
}

We would get the following in the console (even though we’ve used mapAsyncUnordered):

[info] application - In the action
[info] application - Just before rendering
[info] application - 1 has started execution with 92
[info] application - 1 has finished sleeping
[info] application - 2 has started execution with 170
[info] application - 2 has finished sleeping
[info] application - 3 has started execution with 175
[info] application - 3 has finished sleeping
[info] application - 4 has started execution with 138
[info] application - 4 has finished sleeping
[info] application - 5 has started execution with 482
[info] application - 5 has finished sleeping
[info] application - 6 has started execution with 586
[info] application - 6 has finished sleeping
[info] application - 7 has started execution with 231
[info] application - 7 has finished sleeping
[info] application - 8 has started execution with 393
[info] application - 8 has finished sleeping
[info] application - 9 has started execution with 191
[info] application - 9 has finished sleeping
[info] application - 10 has started execution with 121
[info] application - 10 has finished sleeping

And, only after the last blocking call would the results appear in the terminal.

If we provide our ExecutionContext,

package controllers
import javax.inject._
import play.api.mvc._
import akka.stream._
import akka.actor.ActorSystem
import play.api.Logger
import scala.concurrent.{ExecutionContext, Future, Promise}

@Singleton
class HomeController @Inject()(cc: ControllerComponents, actorSystem: ActorSystem)
                              (implicit executionContext: ExecutionContext, materializer: Materializer) extends AbstractController(cc) {
def chunkedFromSource = Action {
    val myEc = ExecutionContext.fromExecutor(new ForkJoinPool(4))
    Logger.info("In the action")

   val futures = (1 to 10).map { i =>
      Future {
        val sleepTime = Random.nextInt(i * 100)
        Logger.info(s"$i has started execution with $sleepTime")
        Thread.sleep(sleepTime)
        Logger.info(s"$i has finished sleeping")
       
        Json.obj("test" -> s"$i")
      } (myEc)
    }

  val test = futures.toIterator
  val source: Source[JsObject,_] = Source.fromIterator(() =>test).mapAsyncUnordered(2)(identity)
  
Logger.info("Just before rendering")

   Ok.chunked(source)
}

The results stream immediately (here not ordered), and we get the following in the console :

[info] application - In the action
[info] application - 1 has started execution with 27
[info] application - 3 has started execution with 126
[info] application - 2 has started execution with 33
[info] application - 4 has started execution with 67
[info] application - Just before rendering
[info] application - 1 has finished sleeping
[info] application - 5 has started execution with 310
[info] application - 2 has finished sleeping
[info] application - 6 has started execution with 166
[info] application - 4 has finished sleeping
[info] application - 7 has started execution with 24
[info] application - 7 has finished sleeping
[info] application - 8 has started execution with 171
[info] application - 3 has finished sleeping
[info] application - 9 has started execution with 593
[info] application - 6 has finished sleeping
[info] application - 10 has started execution with 837
[info] application - 8 has finished sleeping
[info] application - 5 has finished sleeping
[info] application - 9 has finished sleeping
[info] application - 10 has finished sleeping

and in the terminal

c
{"test":"1"}
c
{"test":"2"}
c
{"test":"4"}
c
{"test":"3"}
c
{"test":"6"}
c
{"test":"7"}
c
{"test":"8"}
c
{"test":"5"}
c
{"test":"9"}
d
{"test":"10"}
0

I hope this helps.

KriStof