Retries with POST/PUT requests


I would like to address issue described in where in my setup S3 <- Application <- User i would like to have retries for PUT requests from Application to S3.

If i run application with default settings for retries in HTTP client i get “Substream Source cannot be materialized more than once”, if i disable retries error is normally either IOException broken pipe or UnexpectedConnectionClosureException in both cases upload fails and error is propagated to user. I would like to have retries on application side as well.

When using AWS SDK it accepts InputStream for upload requests, if input stream supports mark-and-reset read limit, i.e. can buffer some data into memory and if on error buffer was big enough it can reset input stream and retry upload once again.

I was looking to have something similar with akka stream sources, but so far failing to come up with working solutions.
I tried with prefixAndTail which is not working as when requests failing error/cancel even propagates within whole graph and i see same exceptions.

Another option i tried is creating custom Source stage with SubSinkInlet which would read from request entity, on error sub-sink is not receiving cancelation event, but i cannot find a way how to recover from source`s out port getting cancel event.

I.e something like this:

    (path("postData") & post) {
      withoutRequestTimeout {
        extractRequestEntity { entity =>
          val resettableSource = new ResettableSource(maximumRetries = 1, readLimit = 10 * 1024 * 1024, originalSource = entity.dataBytes)
          complete {
            retry {
              val dataSource: Source[ByteString, Any] = resettableSource.reset()
                  method = HttpMethods.PUT,
                  uri    = Uri("http://localhost:8081/putData"),
                  entity = HttpEntity(entity.contentType, dataSource)

Maybe somebody has any recommendations how i can implement such feature or hint i can try next?