Stream running multiple elements in parallel not sequential

Hello all

I am having a flow that is fed with a continuous stream of elements every second. In this flow every element expands to a source that is flatMapped into the actual elements. Processing these elements can take longer than the one second until the next element arrives (in the following example this was specifically provoked). It is critical that the flow does not accept any more elements from the upstream until all those elements have been processed.

This is my code right now:

    private Flow<NotUsed, Path, ?> downloadFilesToTempFolder() {
        return Flow.of(NotUsed.class)
                .wireTap(notUsed -> log.warn("NEXT_ELEMENT"))
                .flatMapConcat(notUsed -> Sftp.ls(folder, settings))
                .throttle(1, Duration.ofMillis(800))
                .mapAsync(1, (file) -> {
                    Path tempPath = Paths.get(tempDirectory, file.name());
                    Source<ByteString, ?> fileSource = fromPath(file.path());
                    Sink<ByteString, CompletionStage<IOResult>> fileSink = FileIO.toFile(tempPath.toFile());
                    log.error("Begin file: {}", file.path());
                    return fileSource.runWith(fileSink, materializer)
                            .thenCompose(result -> {
                                log.warn("Deleting file: {}", file.path());
                                return Source.single(file).runWith(removeSink(), materializer);
                            })
                            .thenApply(result -> {
                                log.warn("Done with file: {}", file.path());
                                return tempPath;
                            });
                });
    }

The flow is triggered by this source:

        return Source.repeat(NotUsed.notUsed())
                .throttle(1, scanFrequency)
                .via(downloadFilesToTempFolder());

Here is an output of a sample run:

09:16:46.764 NEXT_ELEMENT
09:16:47.764 NEXT_ELEMENT
09:16:48.754 NEXT_ELEMENT

09:16:48.837 Begin file: /home/docker/file-service/input/f1
09:16:48.931 Deleting file: /home/docker/file-service/input/f1
09:16:49.006 Done with file: /home/docker/file-service/input/f1

09:16:49.562 Begin file: /home/docker/file-service/input/f2
09:16:49.634 Deleting file: /home/docker/file-service/input/f2
09:16:49.700 Done with file: /home/docker/file-service/input/f2

09:16:50.352 Begin file: /home/docker/file-service/input/f3
09:16:50.430 Deleting file: /home/docker/file-service/input/f3
09:16:50.515 Done with file: /home/docker/file-service/input/f3

09:16:51.162 Begin file: /home/docker/file-service/input/f4
09:16:51.233 Deleting file: /home/docker/file-service/input/f4
09:16:51.309 Done with file: /home/docker/file-service/input/f4

09:16:51.952 Begin file: /home/docker/file-service/input/f5
09:16:52.027 Deleting file: /home/docker/file-service/input/f5
09:16:52.103 Done with file: /home/docker/file-service/input/f5

09:16:52.106 NEXT_ELEMENT

09:16:52.762 Begin file: /home/docker/file-service/input/f6
09:16:52.835 Deleting file: /home/docker/file-service/input/f6
09:16:52.919 Done with file: /home/docker/file-service/input/f6

09:16:52.922 NEXT_ELEMENT

09:16:53.562 Begin file: /home/docker/file-service/input/f6

As you can see at first every second an element from the upstream is consumed and nothing is done because no files have been added to the FTP server yet (i.e the Sftp.ls returns an empty source). Later files are available and processing starts. Due to the intentional throttling this takes quite a while and as expected no more NEXT_ELEMENT appear for a while. However a new “token” from upstream is consumed at 09:16:52 quite a while before the last element of the previous run is done. Sometimes this is interleaved with the processing of the last element (for example “Begin file, NEXT_ELEMENT, Deleting file, Done with file”).

I have tried several other versions before the current state. This includes adding a buffering with 1 element on the Flow and adding a conflate step after the throttling in the source. However no matter what I try as soon as the last element is picked up by the mapAsync for some reason it starts accepting the next element and it does not wait for it to complete.

I am confused why this happens, should it not wait until the last element is processed before accepting new elements from upstream?

BR
Yanick

I was able to get the behavior I need by moving the concat operation onto a different layer

(removed some outputs):

    private Flow<NotUsed, Path, ?> downloadFilesToTempFolder() {
        return Flow.of(NotUsed.class)
                .wireTap(notUsed -> log.warn("NEXT_ELEMENT"))
                .map(notUsed -> Sftp.ls(folder, settings))
                .flatMapConcat((files) -> files.mapAsync(this.parallelism, file -> {
                    Path tempPath = Paths.get(tempDirectory, file.name());
                    Source<ByteString, ?> fileSource = fromPath(file.path());
                    Sink<ByteString, CompletionStage<IOResult>> fileSink = FileIO.toFile(tempPath.toFile());
                    return fileSource.runWith(fileSink, materializer)
                            .thenCompose(result -> Source.single(file).runWith(removeSink(), materializer))
                            .thenApply(result -> tempPath);
                }));
    }
10:28:02.268 NEXT_ELEMENT
10:28:24.935 NEXT_ELEMENT
10:28:26.052 NEXT_ELEMENT

Notice the >20 seconds it took to download all the files.

I dont know if that is the right way to do it, but the result is ok :slight_smile:

1 Like