How to stream the result of a fileupload (Source[ByteString, Any]) to another HTTP server

Hi all,

I have an HTTP server that uses the fileupload directive to receive an uploaded large data file, which results in a Source[ByteString, Any], which I would like to then stream to other subscriber HTTP servers.

I am using BroadcastHub to be able to broadcast the data (byteStrings: Source[ByteString, Any]) to the subscribers:

    val producer = byteStrings.toMat(BroadcastHub.sink(bufferSize = 256))(Keep.right).run()

The problem is that the remote HTTP server receives the request, but not any data, so I think I must be missing a runWith(...) or something.
This is the method I am trying to use to stream the data to the other HTTP servers:

  def distribute(streamName: String, host: String, port: Int, producer: Source[ByteString, Any]): Future[Done] = {
    val uri = s"http://$host:$port$transferRoute/$streamName?dist=false"
    createUploadRequest(streamName, uri, producer).flatMap { request =>
      val responseFuture = Http().singleRequest(request)
      responseFuture.onComplete {
        case Success(response) =>
          println(s"Distributed data to $uri")
          response.discardEntityBytes()

        case Failure(ex) =>
          println(s"Distributing data to $uri failed with $ex")
      }
      responseFuture.map(_ => Done)
    }
  }

  private def createUploadRequest(streamName: String, uri: Uri, producer: Source[ByteString, Any]): Future[HttpRequest] = {
    val bodyPart = Multipart.FormData.BodyPart(streamName,
      HttpEntity.IndefiniteLength(ContentTypes.`application/octet-stream`, producer),
      Map("fieldName" -> streamName, "filename" -> streamName)
    )
    val body = FormData(bodyPart)
    Marshal(body).to[RequestEntity].map { entity =>
      HttpRequest(method = HttpMethods.POST, uri = uri, entity = entity)
    }
  }

Any ideas what might be missing?

On a related note: I assume it is more efficient to stream large data files via HTTP POST than to use a remote StreamRef to another actor in the cluster?

Thanks,
Allan

It turned out that the problem was that the stream was being emptied before it could be used, due to the way I was using BroadcastHub. Changing the stream to use a RunnableGraph with Broadcast and Merge fixed the problem.