I’ve implemented a streaming uploader using a custom body parser for my play website that can take n-byte uploads. It’s meant to be used to upload files ranging from between 2MB and 70GB. Right now, it uses a chunk size of around 1MB, but i’d like to allow 4-16MB chunks. When I try to use chunks bigger than 1MB, I get this error:
akka.http.scaladsl.model.EntityStreamException: HTTP chunk size exceeds the configured limit of 1048576 bytes. How do I fix this?
Here’s my BodyParser implementation:
def streamingUploader(experimentName: ExperimentName,
aid: String,
group: String,
fileName: String) = BodyParser { implicit req =>
val path = aid match {
case "0" => (IRODSPath(experimentName) / group / fileName).throwFault
case n => ???
}
val expInfo = pm.tryGetExperimentStubByName(experimentName)
val auth = PTryT(expInfo).flatMapF { exp =>
authModule.forceUser(OwnedBy(exp.owner.userID, exp.owningTeam), Write)
}
val future = auth
.flatMapF { _ =>
logger.info(s"creating directories for $path")
path.getParent.mkDirs()
}
.flatMapF { _ =>
path.createWriter(4 * 1024 * 1024, overwrite = true)
}
.map { writer =>
logger.info(s"creating sink for $path")
Sink
.fold[PTry[(Writer, Long)], ByteString](PSuccess(writer, 0l)) {
case (mW, bs: ByteString) =>
mW.flatMap {
case (w, bytes) => w.write(bs).map(_ -> (bytes + bs.size))
}
}
.mapMaterializedValue { res =>
PTryT(res).map(_._2).value.map { r =>
writer.close()
r
}
}
}
.map { iSink =>
logger.info(s"creating accumulator for $path")
Accumulator(iSink)
.map {
case PFailure(f) =>
Left(InternalServerError(f.faultTrace.mkString("\n")))
case PSuccess(bs) =>
logger.info(s"done")
Right(bs)
}
}
.leftMap { f =>
logger.error(f.faultTrace.mkString("\n"))
//todo: create a fault handler spec, and get the appropriate response with it
Accumulator.done(Left(InternalServerError(f.faultTrace.mkString("\n"))))
}
.fold(identity, identity)
Await.result(future, 3.minutes)
}