Alpakka S3 Dynamic Bucket Key?

Hello there,
I’m using akka-grpc in the streaming client, single response pattern and attempting to persist the streaming client requests into S3 via the Alpakka S3 connector as a MultipartUpload.

Most of the documentation examples of using the S3 connector in this way show usage as:

final Sink<ByteString, CompletionStage<MultipartUploadResult>> sink =
    S3.multipartUpload(bucket(), bucketKey());

My issue is that I have an unbounded stream coming in from the client and the Protobuf requests themselves contain properties that I would like to use in my bucketKey() creation, such as the id field from this Proto:

message MyRequest {
  string id= 1;
  bytes payload= 2;
}

Here is the proto service descriptor:

 rpc MyStream (stream MyRequest) returns (MyResponse) {}

And here is the service implementation from the proto:

@Override
    public CompletionStage<MyResponse> myStream(Source<MyRequest, NotUsed> in) {
        System.out.println("Processing inbound stream " + in.hashCode() + "...");

        in.map(request -> ByteString.fromArray(request.getPayload().toByteArray()))
                .to(S3.multipartUpload(s3Bucket,"s3Key???")) // <-- How can I set s3Key from request.getId()?
                .run(system);          
}                                               

Is there a clean way to grab maybe the first element in the stream and pass it’s properties to the S3.multipartUpload method or is there a pattern that I might be missing?

Thanks!

I’m not sure that I understand your problem, but you can do sth like;

case class MyRequest(bucket: String, key: String, data: ByteString)
in.mapAsync(uploadParallelism)(request => 
  Source.single(request.data).runWith(S3.multipartUpload(request.bucket, request.key))
).runWith(Sink.ignore)

Ofc the java syntax is a bit different and you probably need to make the types to match, but the basic idea is to use mapAsync or mapAsyncUnordered, with an upload parallelism, and fire up a substream for every request…

Thanks @tg44, this is exactly what I needed, using the substream approach is working for me!