How to stream the result of a fileupload (Source[ByteString, Any]) to another HTTP server

Hi all,

I have an HTTP server that uses the fileupload directive to receive an uploaded large data file, which results in a Source[ByteString, Any], which I would like to then stream to other subscriber HTTP servers.

I am using BroadcastHub to be able to broadcast the data (byteStrings: Source[ByteString, Any]) to the subscribers:

    val producer = byteStrings.toMat(BroadcastHub.sink(bufferSize = 256))(Keep.right).run()

The problem is that the remote HTTP server receives the request, but not any data, so I think I must be missing a runWith(...) or something.
This is the method I am trying to use to stream the data to the other HTTP servers:

  def distribute(streamName: String, host: String, port: Int, producer: Source[ByteString, Any]): Future[Done] = {
    val uri = s"http://$host:$port$transferRoute/$streamName?dist=false"
    createUploadRequest(streamName, uri, producer).flatMap { request =>
      val responseFuture = Http().singleRequest(request)
      responseFuture.onComplete {
        case Success(response) =>
          println(s"Distributed data to $uri")

        case Failure(ex) =>
          println(s"Distributing data to $uri failed with $ex")
      } => Done)

  private def createUploadRequest(streamName: String, uri: Uri, producer: Source[ByteString, Any]): Future[HttpRequest] = {
    val bodyPart = Multipart.FormData.BodyPart(streamName,
      HttpEntity.IndefiniteLength(ContentTypes.`application/octet-stream`, producer),
      Map("fieldName" -> streamName, "filename" -> streamName)
    val body = FormData(bodyPart)
    Marshal(body).to[RequestEntity].map { entity =>
      HttpRequest(method = HttpMethods.POST, uri = uri, entity = entity)

Any ideas what might be missing?

On a related note: I assume it is more efficient to stream large data files via HTTP POST than to use a remote StreamRef to another actor in the cluster?


It turned out that the problem was that the stream was being emptied before it could be used, due to the way I was using BroadcastHub. Changing the stream to use a RunnableGraph with Broadcast and Merge fixed the problem.