Hello there,
I’m using akka-grpc
in the streaming client, single response pattern and attempting to persist the streaming client requests into S3 via the Alpakka S3 connector as a MultipartUpload.
Most of the documentation examples of using the S3 connector in this way show usage as:
final Sink<ByteString, CompletionStage<MultipartUploadResult>> sink =
S3.multipartUpload(bucket(), bucketKey());
My issue is that I have an unbounded stream coming in from the client and the Protobuf requests themselves contain properties that I would like to use in my bucketKey()
creation, such as the id
field from this Proto:
message MyRequest {
string id= 1;
bytes payload= 2;
}
Here is the proto service descriptor:
rpc MyStream (stream MyRequest) returns (MyResponse) {}
And here is the service implementation from the proto:
@Override
public CompletionStage<MyResponse> myStream(Source<MyRequest, NotUsed> in) {
System.out.println("Processing inbound stream " + in.hashCode() + "...");
in.map(request -> ByteString.fromArray(request.getPayload().toByteArray()))
.to(S3.multipartUpload(s3Bucket,"s3Key???")) // <-- How can I set s3Key from request.getId()?
.run(system);
}
Is there a clean way to grab maybe the first element in the stream and pass it’s properties to the S3.multipartUpload
method or is there a pattern that I might be missing?
Thanks!