Unmarshaller.map() creates an unmarshaller that ignores the passed ExecutionContext

One would expect that this code:

unmarshaller.unmarshal(entity, materializer)

will perform the unmarshalling in the ExecutionContext of the specified materializer.

However, this is not the case!
If unmarshaller was created using Unmarshaller.map() function, then there is a case where unmarshalling is performed right in the caller thread — not using the provided ExecutionContext.

This is because of the usage of FastFuture inside Unmarshaller:

  def map[C](f: B => C): Unmarshaller[A, C] =
    transform(implicit ec => _ => _.fast map f)

FastFuture.map() calls out to transformWith():

  def transformWith[B](s: A => Future[B], f: Throwable => Future[B])(implicit executor: ExecutionContext): Future[B] = {
    def strictTransform[T](x: T, f: T => Future[B]) =
      try f(x)
      catch { case NonFatal(e) => ErrorFuture(e) }

    future match {
      case FulfilledFuture(a) => strictTransform(a, s)
      // ...

Here we go - on the last line strictTransform is used, instead of performing the computation in the specified ExecutionContext.

I think that this is very confusing and non-intuitive. Why is FastFuture used in Unmarshaller.map? Maybe Unmarshaller should offer two versions — one where a map is trivial and uses FastFuture, and another where it uses the regular Future for the case where map actually does the hard work — as is the case in our code:

Unmarshaller.forMediaType(MediaTypes.APPLICATION_JSON, Unmarshaller.entityToString())
      .thenApply { s -> fromJSON(mapper, s, expectedType) }

Scheduling async tasks comes at a cost (creating a task and enqueueing it, waiting for a thread to pick it up), so very often when it is possible/safe to avoid that for minimal work, that is a good idea to keep things as fast as possible.

FastFuture optimizes away as many such task schedulings as possible, by observing if the future is already completed to invoke operations directly and invoking transforming operations on the same thread that provided the successful value (this is much like how Akka Streams fuses operators together to be executed in a single actor/thread, but for Future, btw).

Can you describe the specific use case where you would rather trade marshalling speed for that control over where the map function executes?

I understand that FastFuture is an optimization. The problem is that this optimization isn’t obvious from the API, which might result in a situation where CPU-intensive tasks are done in the main dispatcher, which leads to starvation.

I believe that many people writing in strongly statically typed languages are used to APIs that tell as much as possible in their type signatures. In this case, there’s map that takes a function and creates an Unmarshaller. There’s nothing that tells that this operation is expected to be “fast” (meaning in this case “not CPU-heavy”). Next, this map returns an object that looks just like any other Unmarshaller, so the caller supplies a suitable ExecutionContext and expects that the CPU-heavy computation will be executed there. The fact that sometimes the heavy computation will be executed in the context where unmarshal is called is completely unexpected.

Can you describe the specific use case where you would rather trade marshalling speed for that control over where the map function executes?

Sure. We need create an Unmarshaller that parses JSON using our instance of Jackson’s ObjectMapper. As I specified in the OP, one of our developers used this construction:

  fun <T> unmarshaller(mapper: ObjectMapper, expectedType: TypeReference<T>): Unmarshaller<HttpEntity, T> =
    Unmarshaller.forMediaType(MediaTypes.APPLICATION_JSON, Unmarshaller.entityToString())
      .thenApply { s -> fromJSON(mapper, s, expectedType) }

where fromJSON essentially calls out to mapper to do the parsing.

The usual convention for a function that returns a Future/CompletionStage is that it may do whatever amount of non-blocking work and return either an already completed or a still open Future on completion. If you pass a ExecutionContext it may use it to schedule some handlers but it will always execute some work directly on the stack. I wouldn’t assume anything stricter.

Whether to treat CPU-heavy tasks as blocking or not needs to be decided in each case. In any case, the optimization that @johanandren mentioned above is only possible in this way. On the other hand, it’s always possible to wrap a computation that you know is heavy into an extra wrapper to execute it on a different pool.

That said, it seems that Unmarshaller.unmarshal has a misleading javadoc comment that hints that you could pass in a EC and that could be used to avoid running “heavy” code on the calling stack. This isn’t true in general. In most cases, Unmarshallers are implemented to be executed eagerly when supplied with strict data.

As a workaround I would wrap parts that are heavy with Future {} or using CompletableFuture.supplyAsync.

Thanks for the reply @jrudolph (and @johanandren). I think in general I agree with your comment. However, the confusion is also real.

it seems that Unmarshaller.unmarshal has a misleading javadoc comment that hints that you could pass in a EC and that could be used to avoid running “heavy” code on the calling stack. This isn’t true in general. In most cases, Unmarshallers are implemented to be executed eagerly when supplied with strict data.

Right, this is actually my point. Should we create an issue to change the doc?

As a workaround I would wrap parts that are heavy with Future {} or using CompletableFuture.supplyAsync .

But the parts that are heavy happen inside the plain function B => C which is the argument to Unmarshaller.map(). Do you suggest to re-type all marshallers we’re using not as Unmarshaller[A, C] but Unmarshaller[A, Future[C]]? Or do you suggest to wrap the call to Unmarshaller.unmarshal() in some other Future? Or using some other API?