Run stream of elements through substream dynamic per element

Hello all

I am having a bit of a hard time finding a appropriate title, if anyone has a better idea after reading my problem description feel free to suggest it :slight_smile:

Right now I am having a Source<FtpFile, ?> fileSource that I want to transform. More specifically I want to download those files and once the download is complete I would like to continue working with the file (but now the local file). I am using Alpakka for the I/O operations. Using fileSource.flatMapConcat(file -> Sftp.fromFile(file, settings)) I can transform all files that are emitted by the Source into a continuous stream of ByteString. So this is not very helpful, since I need to know the boundaries of each file.

In some sort of pseudo code what I would like to do is the following:

Source<FtpFile, ?> fileSource = Source.repeat(this)
                                      .throttle(1, frequency)
                                      .flatMapConcat(elem -> Sftp.ls(scanFolder, settings));

// pseudo code:
// fileSource.fanOut(file -> Sftp.download(file.path(), settings))
//           .fanIn(file -> FileIO.toPath(getTempFile(file.path()))
//           .map(file -> Paths.get(getTempFile(file.path());
// ...

I hope it is understandable what I am trying to achieve. Right now I am always facing problems, that I have a source and a sink that has to be created for each element passing through the stream and there I always fail to create a graph, because it expects the Sink to be created when modelling the graph and not on a per element basis.

What are alternative ways to achieve this?

Thank you and BR
Yanick

Hi Yanick,

You will want to create an inner flow which transfers the file and runs within a mapAsync.
I believe the webinar I did in August explains how to use this https://www.lightbend.com/blog/pakk-your-alpakka-reactive-streams-integrations-for-aws-azure-google-cloud

Or just look at the code from the webinar at https://github.com/akka/alpakka-samples/tree/master/alpakka-sample-mqtt-http-to-s3-java

Cheers,
Enno.

1 Like

Hello @ennru

Thank you for your reply. Incidentally this is also the way I have come up with it in the meantime but I was not sure if that is the right way to do it :slight_smile:, but apparently it is :+1:

BR
Yanick