How to wait for file to write before reading it again for further processing

Hi,
I am new to Akka and Alpakka so appreciate any help please.
I am trying to save a tar.gz file, then unpack it and save the unpacked files, then read one of the unpacked files for further processing. This means somewhere the system has to wait for the tar.gz to be written and for the unpacking to finish before the file can be read again.
I am stuck with how to wait for this (not using Await).
The structure looks something like this:

Source
.via(download the tar.gz file and save it to disc)
.via(untarFiles and save to disc)
.via(read one of the unpacked files and process)
.toMat(Sink.foreach(println))(Keep.both).run()

With a few more comments:
Source
.via(download the tar.gz file and save) // save the file using: dataBytes.runWith(FileIO.toPath(new File(tarFileLocation).toPath)) which returns a Future[IOResult].
.via(untarFiles and save to disc) //File • Alpakka Documentation but the problem is that the tar.gz file has not finished writing yet and hence does not exist yet. I also need to pass the file-name of the unpacked file on to then read.
.via(read one of the unpacked files and process)//the unpacked file is not there yet as the untarFile has not finished yet
.toMat(Sink.foreach(println))(Keep.both).run()

My current solution looks like this:

Source
.via(download the tar.gz file and save it to disc) //returns (Future[IOResult], String)
.toMat(Sink.foreach(x => x._1.onComplete { //x: (Future[IOResult], String), wait for .tar.gzip file to be saved
case Success(s) =>
Source.single(x._2.zipFilePath).via(untarFile).runWith(Sink.foreach(x1 => x1._1.onComplete { //wait for untar to finish
case Success(s1) => Source.single(x1._2).via(processFile).runWith(Sink.foreach(println))
case Failure(e1) => println(e1); throw e1
}))
case Failure(e) => println(e); throw e
}))(Keep.both).run()

This does “feel” wrong somehow, waiting for the completion in the Sink.

Thank you very much!

Futures as elements inside a stream is not often what you want, instead you can use .mapAsync(1)(I => Future[O]) to emit the O result downstream once the Future completes. If the future fails it fails the stream.

It can be done in many ways. Some pseudo code of one possible way:

Source(bunch_of_urls_to_tgz_files)
  .flatMap(url => ... http request/returning entire unmarshalled Future(http response body))
  .via(Compression.gunzip()) // not going via disk at all, gunzip directly
  .via(Archive.tarReader()) // then untar
  .mapAsync(1) { case (metadata, source) =>
    // process the tar file entries just as in the example in the docs over at
    // https://doc.akka.io/docs/alpakka/current/file.html#reading-tar-archives
    Future(file or directory you created)
  }
  .map(file => ...) // maybe do some other processing of that file, or logging
  .runWith(Sink.ignore) // or just consume each and ignore it

It could also be possible to not load the full response body into memory first but combine mapAsync for the response, with flatMapConcat to consume the file bytes/response body in a streaming fashion via the other operators.

I hope this gave you some ideas

Hi,
thank you!
How would I wait for the .tar.gz file to save before untaring it? I understand that it might not be necessary. I am interested in how to wait.
In .map(file => …) the file is a future, no? So would it be .map(file => file.oncomplete (do something))?

If you have a function that can stream the file to disk and then complete a future, you can use .mapAsync(1)(...) to turn that completed future value into an element in the stream, once it is ready, so that you can do further processing on it in the same stream.

Thank you!
If you have a function that can stream the file to disk and then complete a future:
I’m stuck with how to complete the future. Saving returns a Future[FileIO], not FileIO. How do I complete the future?

That FileIO will end up in the stream once the future completes if you use .mapAsync(1)(logicThatReturnsFutureFileIO())

Thank you so much for unblocking my brain!