How can I make sure that `FileIO.fromPath` is picking up my dispatcher?

I’m trying to change the underlying dispatcher of FileIO. The docs say that it is using akka.stream. blocking-io-dispatcher. So I configured it like this:

akka.stream {
  blocking-io-dispatcher = "my-dispatcher"
}

my-dispatcher {
  type = Dispatcher
  executor = "thread-pool-executor"
  thread-pool-executor {
    fixed-pool-size = 10
  }
  throughput = 1
}

But just to be sure, I also set it explicitly in code

  FileIO
    .fromPath(source.value)
    .withAttributes(ActorAttributes.dispatcher("my-dispatcher"))
    .log(s"reading ${source.value}")
    .fold(ByteString.empty)(_ ++ _)
    .map(_.toArray)
    .map(ImmutableImage.loader().fromBytes)
}

But the log statements are still being written using the default dispatcher. Why?

12:01:47.624UTC DEBUG[default-akka.actor.default-dispatcher-4] [akka.stream.Log(akka://default/system/Materializers/StreamSupervisor-0)] - [reading /Users/oliver/Downloads/e7lp1y1tuwt41.jpg] Element: [...]

Is the underlying file operation still done on my thread pool? Is logging just working on another thread pool? Something wrong with my assumptions or configuration?

How can I prove that the reading is done by my own thread pool?

You should probably create your own Materializer and pass it to runWith.

      val as       = ActorSystem("test")
      val settings = ActorMaterializerSettings(as)
      val newSettings = settings.withDispatcher("my-dispatcher")
      val mat = ActorMaterializer(newSettings)(as)

      FileIO
        .fromPath(Paths.get("/tmp/toto.txt"))
        .fold(ByteString.empty)(_ ++ _)
        .map(_ => println(s"Current thread: ${Thread.currentThread().getName}"))
        .runWith(Sink.ignore)(mat)

logs:

Current thread: test-my-dispatcher-11

Or you can just specify in the config:

akka.stream.materializer{
  dispatcher = "my-dispatcher"
  blocking-io-dispatcher = "my-dispatcher"
}

First of: Thanks for the answer. I would have used it as is if I would not be using. Akka 2.6. But the thing that actually triggered the solve was this line

.map(_ => println(s"Current thread: ${Thread.currentThread().getName}"))

I did not think of doing that. When I added it I saw that my stream was working correctly. But it got me back on reading the documentation.

Turns out I had some misconceptions:

Since the logging is done asynchronously the thread in which the logging was performed is captured in Mapped Diagnostic Context (MDC) with attribute name sourceThread.

via Classic Logging • Akka Documentation

explains it all.

  1. Of course it makes sense that the logging takes place on another thread and does not block the original thread.
  2. To write the actual thread that initiated the logging you have to use [%X{sourceThread}] in your logbook configuration. A [%thread] would just print out the logging thread which is normally not really helpful (for me).

Bit I still have some Problems:
On top of that, I am using Akka 2.6. That means that ActorMaterializerSettings and ActorMaterializer is deprecated.

akka.stream.blocking-io-dispatcher = "image-processing-dispatcher"
akka.stream.materializer.blocking-io-dispatcher = "processing-dispatcher"

Did NOT work for me and would be too broad in my case)

akka.stream.materializer.dispatcher = "my-dispatcher"

Did work, but is also too broad in my case.

So.

The SOLUTION for Akka 2.6 and my use case:

  1. Explicitly set the dispatcher on the stream
FileIO
  .fromPath(source.value)
  .withAttributes(ActorAttributes.dispatcher("my-dispatcher"))
  [...]
  1. and then use [%X{sourceThread}] in the logbook appender configuration
1 Like

While the log operator may give you the wrong thread, there is something more hiding here:

The attribute is added on the stream graph constructed so far, and each operator you call adds to the graph. To make that more explicit:

(FileIO
    .fromPath(source.value)
    .withAttributes(ActorAttributes.dispatcher("my-dispatcher"))
).log(s"reading ${source.value}")
 .fold(ByteString.empty)(_ ++ _)

The custom dispatcher is set on the fromPath operator only, while the log and fold operators are on the default dispatcher. When the dispatcher attribute changes between operators it leads to an asynchronous boundary, so the fromPath will run in one actor and the log + fold in another.

If you want to run more operators in the same dispatcher, you’d have to move the attribute down:

source.operator1
  .operator2
  .withAttribute(attr)

which actually means/is the same as:

((source.operator1).operator2).withAttribute(attr)

and not adding it to the last operator2 which might be what you’d think.

If only setting it on operator2 you’d have to do:

source.operator1.via(Flow.operator2.withAttribute(attr))
1 Like

Thanks for pointing that out. That was something I actually tripped over after making my realizations and did not address in my post above.

I went through the scenarios you are describing - there were a log statements after each operator in my code until I made the realization that you would (normally) want the withDispatcher as close to the run as possible (for simple flows- where you would maybe want only one async boundary).