Hi all,
I have an HTTP server that uses the fileupload
directive to receive an uploaded large data file, which results in a Source[ByteString, Any], which I would like to then stream to other subscriber HTTP servers.
I am using BroadcastHub to be able to broadcast the data (byteStrings: Source[ByteString, Any]) to the subscribers:
val producer = byteStrings.toMat(BroadcastHub.sink(bufferSize = 256))(Keep.right).run()
The problem is that the remote HTTP server receives the request, but not any data, so I think I must be missing a runWith(...)
or something.
This is the method I am trying to use to stream the data to the other HTTP servers:
def distribute(streamName: String, host: String, port: Int, producer: Source[ByteString, Any]): Future[Done] = {
val uri = s"http://$host:$port$transferRoute/$streamName?dist=false"
createUploadRequest(streamName, uri, producer).flatMap { request =>
val responseFuture = Http().singleRequest(request)
responseFuture.onComplete {
case Success(response) =>
println(s"Distributed data to $uri")
response.discardEntityBytes()
case Failure(ex) =>
println(s"Distributing data to $uri failed with $ex")
}
responseFuture.map(_ => Done)
}
}
private def createUploadRequest(streamName: String, uri: Uri, producer: Source[ByteString, Any]): Future[HttpRequest] = {
val bodyPart = Multipart.FormData.BodyPart(streamName,
HttpEntity.IndefiniteLength(ContentTypes.`application/octet-stream`, producer),
Map("fieldName" -> streamName, "filename" -> streamName)
)
val body = FormData(bodyPart)
Marshal(body).to[RequestEntity].map { entity =>
HttpRequest(method = HttpMethods.POST, uri = uri, entity = entity)
}
}
Any ideas what might be missing?
On a related note: I assume it is more efficient to stream large data files via HTTP POST than to use a remote StreamRef to another actor in the cluster?
Thanks,
Allan