Run stream of elements through substream dynamic per element


(Yanick Salzmann) #1

Hello all

I am having a bit of a hard time finding a appropriate title, if anyone has a better idea after reading my problem description feel free to suggest it :slight_smile:

Right now I am having a Source<FtpFile, ?> fileSource that I want to transform. More specifically I want to download those files and once the download is complete I would like to continue working with the file (but now the local file). I am using Alpakka for the I/O operations. Using fileSource.flatMapConcat(file -> Sftp.fromFile(file, settings)) I can transform all files that are emitted by the Source into a continuous stream of ByteString. So this is not very helpful, since I need to know the boundaries of each file.

In some sort of pseudo code what I would like to do is the following:

Source<FtpFile, ?> fileSource = Source.repeat(this)
                                      .throttle(1, frequency)
                                      .flatMapConcat(elem -> Sftp.ls(scanFolder, settings));

// pseudo code:
// fileSource.fanOut(file -> Sftp.download(file.path(), settings))
//           .fanIn(file -> FileIO.toPath(getTempFile(file.path()))
//           .map(file -> Paths.get(getTempFile(file.path());
// ...

I hope it is understandable what I am trying to achieve. Right now I am always facing problems, that I have a source and a sink that has to be created for each element passing through the stream and there I always fail to create a graph, because it expects the Sink to be created when modelling the graph and not on a per element basis.

What are alternative ways to achieve this?

Thank you and BR
Yanick


(Enno) #2

Hi Yanick,

You will want to create an inner flow which transfers the file and runs within a mapAsync.
I believe the webinar I did in August explains how to use this https://www.lightbend.com/blog/pakk-your-alpakka-reactive-streams-integrations-for-aws-azure-google-cloud

Or just look at the code from the webinar at https://github.com/akka/alpakka-samples/tree/master/alpakka-sample-mqtt-http-to-s3-java

Cheers,
Enno.


(Yanick Salzmann) #3

Hello @ennru

Thank you for your reply. Incidentally this is also the way I have come up with it in the meantime but I was not sure if that is the right way to do it :slight_smile:, but apparently it is :+1:

BR
Yanick