Writing element each to its own file, problem on last element

I want to save each element in the stream to it’s separate file. I’m not sure if I’m missing something or there is a bug, as for the last element the file is always created but it is empty.

val done =
    Source(1 to 4)
      .map(i => ByteString.fromString(i.toString))

      .toMat(
        LogRotatorSink.withSinkFactory(
          triggerGeneratorCreator =
            () => n => Some(Path.of(new String(s"file${n.decodeString("UTF-8")}" + ".txt"))),
          sinkFactory =
            (path: Path) =>
              Flow[ByteString].toMat(FileIO.toPath(path, Set(CREATE, WRITE, TRUNCATE_EXISTING, SYNC)))(Keep.right)
        )
      )(Keep.right).run

  Await.ready(done, 2.seconds)
  system.terminate()
  Await.ready(system.whenTerminated, 2.seconds)

Not entirely sure why the LogRotatorSink does that, but it also does not sound like a perfect match for what you want to do, its purpose is rather like how system logs on Unixes work where you want to append multiple elements (lines) to the same file but also make sure the individual file does not grow unboundedly and is closed after a while so that it can be compressed and cleaned up after a while.

To write each element to a separate file, and maintain backpressure from how fast you can write said file, you could instead use something like:

Source(1 to 4)
  .mapAsync(1) { n =>
    val path = Path.of(s"/tmp/file${n}" + ".txt")
    Source.single(ByteString(n.toString))
      .toMat(FileIO.toPath(path))(Keep.right)
      .run()
  }.run()

Note that ByteString is not in general something representing a bounded chunk of bytes, like a line for example, and usually more like a chunk of a longer stream of bytes representing a complete thing (for example if it is a stream of bytes from the stream network operators, file operators, Akka HTTP request or response body and so on), so make sure that it makes sense in your case to think of them as such.

I guess writing files in mapAsync works, I wanted something more idiomatic to streams.
Thanks for the explanations.

I would not call this less idiomatic.

Performance wise the LogRotatorSink still needs to materialize each new sink that is returned from the factory so it will likely be roughly the same.

This is super strange… I checked the code, and it should write the content and only complete the materialized value after the file is closed… Or at least it is a really strange corner case, that we never found. I think the main problem is with the termination, after the Await.ready, you should give a bit more time to write the content to the sink before terminating. If this would solve the problem, than you found a bug… (I will check this.)

Writing with mapAsync can be faster, bcs you can tune the parallelism…

I did not see any effect on increasing time timeout.
I already opened: LogRotatorSink not handling correctly the last element (before completion) · Issue #2553 · akka/alpakka · GitHub

I started a PR but it is not as easy to fix as I thought (or at least my knowledge and understanding stopped at a point)…