Committing a Kafka cursor after chunk upload on multipart S3 upload

I am in the process of creating a backup tool for Kafka that persists the Kafka messages in an AWS S3 bucket. One of the goals of the tool is to be stateless so we use AWS S3 API to handle state management (i.e. their API gives us the latest uploaded chunks so we can keep track of the multipart upload without needing to store state locally).

The way that the tool currently works is that it contiously uploads Kafka messages using Alpakka Kafka’s Consumer.sourceWithContext to the S3 multipart upload API via Alpakka’s S3 module (i.e. S3.multipartUpload). The idea is that I only want to commit the Kafka cursor after a single chunk in the multipart upload has been persisted into S3 (more specifically I want to upload the latest current offset for each partition when the chunk has been saved into S3). This is how the tool is stateless, i.e. if someone shuts it down halfway through uploading a chunk then when they restart the tool Kafka will send the messages just after the latest chunk upload and we restart from there.

Herein lies the problem, as far as I understand this is currently not really possible because the Alpakka S3 API abstracts away all of the chunk/part handling so there is no way for the SourceWithContext that contains the Kafka commit data to know when a Chunk/Part in S3 has actually been uploaded. In summary I have 2 issues at hand that I need advice in

  • I need to submit a PR at Alpakka S3 that makes adjustments to the API so that I know when a Chunk has been successfully uploaded into S3, the question here is what would be the recommended adjustments that I need to do for the API. i.e. should a make a new method in S3 along the lines of
    def multipartUploadWithContext[T](
        s3Location: S3Location,
        contentType: ContentType = ContentTypes.`application/octet-stream`,
        s3Headers: S3Headers,
        chunkSize: Int = MinChunkSize,
        chunkingParallelism: Int = 4,
        chunkUploadSink: UploadPartResponse => Sink[T, NotUsed]
    ): Sink[(ByteString, T), Future[MultipartUploadResult]]
    where T is some user provided data (in my case kafka commit context) and chunkUploadSink is a function that the user provides with the input being the response of the chunk upload and then the output is a Sink that defines what to do with that T (i.e. commit the cursor to Kafka). I created an issue on Alpakka at S3: Suggestions on how to modify multipart upload to signal successful chunk upload · Issue #2760 · akka/alpakka · GitHub for specific discussion on this.
  • What is the best way to handle the updating of cursor context data as you receive Kafka messages? Basically what I want is a map like datastructure, i.e. Map[Partition, CommitInfo] that continuously gets updated as I receive Kafka messages, i.e. if I receive a Kafka message and its commit cursor is higher than the currently one stored for the given partition in Map[Partition, CommitInfo] (or if it doesn’t exist) then I update/insert it. This Map is going to be the T in the multipartUploadWithContext suggestion from the previous point so that when a chunk is properly uploaded into S3, this Map will contain the latest commit for each partition which I will commit and then I need to empty the Map which will repeat the process as further Kafka messages get consumed. This processing will basically need to take the Alpakka Kafka’s CommittableOffset as an input to create and update this map like data structure in place. From what I understand Alpakka Kafka already has some batching mechanism for commit’s so I may not have to manually creates this Map like data structure however I would need precise control over the batching (i.e. batch until S3 chunk is uploaded and/or multipart upload has been finished).

I have gone ahead and created a draft implementation of this issue at S3: Add multipart upload with context by mdedetrich · Pull Request #2770 · akka/alpakka · GitHub