Hi,
I’m a newbie, but I thought studying this issue would help me learn something and, maybe point you in the right direction. I hope other, more experienced users, will chime in as well. (What I mean is : you should take this reply with a grain of salt).
Regarding the conversion from Seq[Future[JsObject]]
to a Source
, you don’t need to go through Enumerators (and I’m pretty sure that generateM doesn’t do what you want).
You could instead use the Source.fromIterator operator along with the mapAsync operator (or the mapAsyncUnorderedoperator).
So the way you would do it is :
val futIterator = futures.toIterator
val source: Source[JsObject,_] = Source.fromIterator(() =>futIterator).mapAsyncUnordered(2)(identity)
Ok.chunked(source)
However, this is only part of the story. Depending on how you construct your Futures, you will run into ExecutionContext issues and things won’t happen as you would expect them to.
While the documentation for this issue is here and here, to me a simple example helped me understand the problem.
Consider the following controller :
package controllers
import javax.inject._
import play.api.mvc._
import akka.stream._
import akka.actor.ActorSystem
import play.api.Logger
import scala.concurrent.{ExecutionContext, Future, Promise}
@Singleton
class HomeController @Inject()(cc: ControllerComponents, actorSystem: ActorSystem)
(implicit executionContext: ExecutionContext, materializer: Materializer) extends AbstractController(cc) {
def chunkedFromSource = Action {
Logger.info("In the action")
val seqFut = Seq(
Future {
Logger.info("ZeroS has started")
Thread.sleep(5000)
Logger.info("ZeroS has finished")
},
Future {
Logger.info("OneS has started")
Thread.sleep(100)
Logger.info("oneS has finished")
},
Future {
Logger.info("twoS has started")
Thread.sleep(50)
Logger.info("twoS has finished")
}
)
Logger.info("Just before rendering")
Ok("hello\n")
}
If you query this controller (I’ve mapped it to test) by doing eg curl -N --raw http://localhost:9000/test
You get the following in the console :
[info] application - In the action
[info] application - Just before rendering
[info] application - ZeroS has started
[info] application - ZeroS has finished
[info] application - OneS has started
[info] application - oneS has finished
[info] application - twoS has started
[info] application - twoS has finished
And ‘hello’ appears only when twoS has finished.
This is not what I expected. But it makes sense when you read the aforementioned documentation. So what happens when we bring in our own execution context (note that this is not the way to do it, it’s just for the demo):
package controllers
import javax.inject._
import play.api.mvc._
import akka.stream._
import akka.actor.ActorSystem
import play.api.Logger
import scala.concurrent.{ExecutionContext, Future, Promise}
@Singleton
class HomeController @Inject()(cc: ControllerComponents, actorSystem: ActorSystem)
(implicit executionContext: ExecutionContext, materializer: Materializer) extends AbstractController(cc) {
def chunkedFromSource = Action {
val myEc = ExecutionContext.fromExecutor(new ForkJoinPool(4))
Logger.info("In the action")
val seqFut = Seq(
Future {
Logger.info("ZeroS has started")
Thread.sleep(5000)
Logger.info("ZeroS has finished")
} (myEc),
Future {
Logger.info("OneS has started")
Thread.sleep(100)
Logger.info("oneS has finished")
} (myEc),
Future {
Logger.info("twoS has started")
Thread.sleep(50)
Logger.info("twoS has finished")
} (myEc)
)
Logger.info("Just before rendering")
Ok("hello\n")
}
Now, ‘hello’ is returned instantly and the console shows :
[info] application - In the action
[info] application - ZeroS has started
[info] application - OneS has started
[info] application - Just before rendering
[info] application - twoS has started
[info] application - twoS has finished
[info] application - oneS has finished
[info] application - ZeroS has finished
Which is what we would expect.
Now going back to the chunked example, if we do this :
package controllers
import javax.inject._
import play.api.mvc._
import akka.stream._
import akka.actor.ActorSystem
import play.api.Logger
import scala.concurrent.{ExecutionContext, Future, Promise}
@Singleton
class HomeController @Inject()(cc: ControllerComponents, actorSystem: ActorSystem)
(implicit executionContext: ExecutionContext, materializer: Materializer) extends AbstractController(cc) {
def chunkedFromSource = Action {
//val myEc = ExecutionContext.fromExecutor(new ForkJoinPool(4))
Logger.info("In the action")
val futures = (1 to 10).map { i =>
Future {
val sleepTime = Random.nextInt(i * 100)
Logger.info(s"$i has started execution with $sleepTime")
Thread.sleep(sleepTime)
Logger.info(s"$i has finished sleeping")
Json.obj("test" -> s"$i")
} //(myEc)
}
val test = futures.toIterator
val source: Source[JsObject,_] = Source.fromIterator(() =>test).mapAsyncUnordered(2)(identity)
Logger.info("Just before rendering")
Ok.chunked(source)
}
We would get the following in the console (even though we’ve used mapAsyncUnordered):
[info] application - In the action
[info] application - Just before rendering
[info] application - 1 has started execution with 92
[info] application - 1 has finished sleeping
[info] application - 2 has started execution with 170
[info] application - 2 has finished sleeping
[info] application - 3 has started execution with 175
[info] application - 3 has finished sleeping
[info] application - 4 has started execution with 138
[info] application - 4 has finished sleeping
[info] application - 5 has started execution with 482
[info] application - 5 has finished sleeping
[info] application - 6 has started execution with 586
[info] application - 6 has finished sleeping
[info] application - 7 has started execution with 231
[info] application - 7 has finished sleeping
[info] application - 8 has started execution with 393
[info] application - 8 has finished sleeping
[info] application - 9 has started execution with 191
[info] application - 9 has finished sleeping
[info] application - 10 has started execution with 121
[info] application - 10 has finished sleeping
And, only after the last blocking call would the results appear in the terminal.
If we provide our ExecutionContext,
package controllers
import javax.inject._
import play.api.mvc._
import akka.stream._
import akka.actor.ActorSystem
import play.api.Logger
import scala.concurrent.{ExecutionContext, Future, Promise}
@Singleton
class HomeController @Inject()(cc: ControllerComponents, actorSystem: ActorSystem)
(implicit executionContext: ExecutionContext, materializer: Materializer) extends AbstractController(cc) {
def chunkedFromSource = Action {
val myEc = ExecutionContext.fromExecutor(new ForkJoinPool(4))
Logger.info("In the action")
val futures = (1 to 10).map { i =>
Future {
val sleepTime = Random.nextInt(i * 100)
Logger.info(s"$i has started execution with $sleepTime")
Thread.sleep(sleepTime)
Logger.info(s"$i has finished sleeping")
Json.obj("test" -> s"$i")
} (myEc)
}
val test = futures.toIterator
val source: Source[JsObject,_] = Source.fromIterator(() =>test).mapAsyncUnordered(2)(identity)
Logger.info("Just before rendering")
Ok.chunked(source)
}
The results stream immediately (here not ordered), and we get the following in the console :
[info] application - In the action
[info] application - 1 has started execution with 27
[info] application - 3 has started execution with 126
[info] application - 2 has started execution with 33
[info] application - 4 has started execution with 67
[info] application - Just before rendering
[info] application - 1 has finished sleeping
[info] application - 5 has started execution with 310
[info] application - 2 has finished sleeping
[info] application - 6 has started execution with 166
[info] application - 4 has finished sleeping
[info] application - 7 has started execution with 24
[info] application - 7 has finished sleeping
[info] application - 8 has started execution with 171
[info] application - 3 has finished sleeping
[info] application - 9 has started execution with 593
[info] application - 6 has finished sleeping
[info] application - 10 has started execution with 837
[info] application - 8 has finished sleeping
[info] application - 5 has finished sleeping
[info] application - 9 has finished sleeping
[info] application - 10 has finished sleeping
and in the terminal
c
{"test":"1"}
c
{"test":"2"}
c
{"test":"4"}
c
{"test":"3"}
c
{"test":"6"}
c
{"test":"7"}
c
{"test":"8"}
c
{"test":"5"}
c
{"test":"9"}
d
{"test":"10"}
0
I hope this helps.
KriStof