Run stream of elements through substream dynamic per element

(Yanick Salzmann) #1

Hello all

I am having a bit of a hard time finding a appropriate title, if anyone has a better idea after reading my problem description feel free to suggest it :slight_smile:

Right now I am having a Source<FtpFile, ?> fileSource that I want to transform. More specifically I want to download those files and once the download is complete I would like to continue working with the file (but now the local file). I am using Alpakka for the I/O operations. Using fileSource.flatMapConcat(file -> Sftp.fromFile(file, settings)) I can transform all files that are emitted by the Source into a continuous stream of ByteString. So this is not very helpful, since I need to know the boundaries of each file.

In some sort of pseudo code what I would like to do is the following:

Source<FtpFile, ?> fileSource = Source.repeat(this)
                                      .throttle(1, frequency)
                                      .flatMapConcat(elem ->, settings));

// pseudo code:
// fileSource.fanOut(file ->, settings))
//           .fanIn(file -> FileIO.toPath(getTempFile(file.path()))
//           .map(file -> Paths.get(getTempFile(file.path());
// ...

I hope it is understandable what I am trying to achieve. Right now I am always facing problems, that I have a source and a sink that has to be created for each element passing through the stream and there I always fail to create a graph, because it expects the Sink to be created when modelling the graph and not on a per element basis.

What are alternative ways to achieve this?

Thank you and BR

(Enno) #2

Hi Yanick,

You will want to create an inner flow which transfers the file and runs within a mapAsync.
I believe the webinar I did in August explains how to use this

Or just look at the code from the webinar at


(Yanick Salzmann) #3

Hello @ennru

Thank you for your reply. Incidentally this is also the way I have come up with it in the meantime but I was not sure if that is the right way to do it :slight_smile:, but apparently it is :+1: