Newbie question about Alpakka S3

alpakka
streams

(K) #1

Hello,

I have been trying to use Alpakka S3 to perform a rather straightforward operation:

  1. List all files on a given bucket
  2. Download all of them and parse content

It seem to me that the resulting code is unwieldy complicated with nested Sources, is there a way to simplify / flatten the structure somehow? Am I missing something obvious?

S3.download is now a Source[Option[Source[ByteString]]], is there an easy way to flatten it out into just a Source[ByteString] since I already know that the file exists.

So e.g. I would like to flatten:
val file: Source[Source[Option[(Source[ByteString, NotUsed], ObjectMetadata)], NotUsed], NotUsed] = S3.listBucket(conf.bucketName, Some(conf.prefixFilter)) .map(r => S3.download(r.bucketName, r.key))
to something like:

Source[ByteString, NotUsed]

Any thoughts are welcomed,
Regards,
K


(Gergő Törcsvári) #2

Yes there is some options there.

val file: Source[Source[Option[(Source[ByteString, NotUsed], ObjectMetadata)], NotUsed], NotUsed] =
  S3.listBucket(conf.bucketName, Some(conf.prefixFilter)) 
  .map(r => S3.download(r.bucketName, r.key))

with https://doc.akka.io/docs/akka/2.5/stream/operators/Source-or-Flow/flatMapConcat.html
can be sth like:

val f: Source[ByteString, NotUsed] = S3.listBucket(bucketName, None)
    .map(r => S3.download(r.bucketName, r.key))
    .flatMapConcat(identity)
    .collect{case Some(x) => x}
    .flatMapConcat{case (s, o) => s}