How to stream a single Source[ByteString, Any] to multiple HTTP servers (and also broadcast to websocket flows)

I have an akka-http server that receives a possibly large stream of ByteStrings and needs to write the data to a number of websockets as well as other HTTP servers.

I can use a broadcast graph for writing to the websockets, however the only API I can find for streaming to an HTTP server requires a Source for the data. For example:

  def makeHttpRequest(uri: String, data: Source[ByteString, Any]): HttpRequest = {
    HttpRequest(HttpMethods.POST, uri, 
      entity = HttpEntity(ContentTypes.`application/octet-stream`, data))
  }

Normally a Source can only be used once, so I can’t use it in the HTTP request and also other places.
I tried using BroadcastHub to get a reusable source:

    val runnableGraph  = source.toMat(BroadcastHub.sink(bufferSize = 1))(Keep.right)
    val reusableSource = runnableGraph.run()

However the problem then is that in some cases the stream completes before it can be used again.
Here is a small example that shows how that can happen:

  implicit val system       = ActorSystem("StreamTest")
  implicit val materializer = ActorMaterializer()
  import system._

  val source         = Source(0 to 5)
  val runnableGraph  = source.toMat(BroadcastHub.sink(bufferSize = 1))(Keep.right)
  val reusableSource = runnableGraph.run()

  val a = reusableSource
    .mapAsync(1) { i =>
      Future { println(s"A: $i") }
    }
    .runWith(Sink.ignore)

  val b = reusableSource
    .alsoTo(Sink.foreach(i => println(s"B1: $i")))
    .mapAsync(1) { i =>
      Future { Thread.sleep(100); println(s"B2: $i") }
    }
    .runWith(Sink.ignore)

  val c = reusableSource.runWith(Sink.ignore)

  Future.sequence(List(a, b, c)).onComplete {
    case Success(_) =>
      println("OK")
      system.terminate()
    case Failure(ex) =>
      println(s"Error: $ex")
      system.terminate()
  }

In the above test “A” completes the stream before “B” can consume it.

Another thing I tried is using MergeHub to get a sink that I could use to feed a source from within a broadcast graph:

      val (remoteSink, remoteSource)  = MergeHub.source[ByteString](1).preMaterialize()

However that resulted in this error:

[ERROR] [01/16/2019 19:32:44.783] [vbds-system-akka.actor.default-dispatcher-16] [akka://vbds-system/system/StreamSupervisor-0/flow-5-0-ignoreSink] Error in stage [akka.stream.scaladsl.MergeHub$$anon$2@4b6a39de]: Upstream producer failed with exception, removing from MergeHub now
akka.stream.scaladsl.MergeHub$ProducerFailed: Upstream producer failed with exception, removing from MergeHub now
	at akka.stream.scaladsl.MergeHub$$anon$2$$anon$3.onUpstreamFailure(Hub.scala:271)
	at akka.stream.impl.fusing.GraphInterpreter.processEvent(GraphInterpreter.scala:505)
	at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:378)
	at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:588)
	at akka.stream.impl.fusing.GraphInterpreterShell$AsyncInput.execute(ActorGraphInterpreter.scala:472)
	at akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:563)
	at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:745)
	at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:760)
	at akka.actor.Actor.aroundReceive(Actor.scala:517)
	at akka.actor.Actor.aroundReceive$(Actor.scala:515)
	at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:670)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:588)
	at akka.actor.ActorCell.invoke(ActorCell.scala:557)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
	at akka.dispatch.Mailbox.run(Mailbox.scala:225)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: akka.http.scaladsl.model.EntityStreamException: Entity stream truncation
	at akka.http.impl.engine.parsing.HttpMessageParser$$anonfun$2.applyOrElse(HttpMessageParser.scala:330)
	at akka.http.impl.engine.parsing.HttpMessageParser$$anonfun$2.applyOrElse(HttpMessageParser.scala:328)
	at akka.stream.impl.fusing.Collect$$anon$6.$anonfun$wrappedPf$1(Ops.scala:217)
	at akka.stream.impl.fusing.SupervisedGraphStageLogic.withSupervision(Ops.scala:178)
	at akka.stream.impl.fusing.Collect$$anon$6.onPush(Ops.scala:219)
	at akka.stream.impl.fusing.GraphInterpreter.processPush(GraphInterpreter.scala:519)
	at akka.stream.impl.fusing.GraphInterpreter.processEvent(GraphInterpreter.scala:482)
	... 18 more

For now I am doing a separate POST to the other HTTP servers for each ByteString received, but what I really want to do is stream the source in the HTTP request, in the same way as when you upload a file.

Any suggestions on how to do this?

Note that the HTTP servers also form an akka cluster, but I assume using akka messages to send the data would not be recommended (since it is bulk data). StreamRefs are also not recommended for bulk data.

It’s a tricky problem with a live stream that can only be consumed once, like the HTTP request body, to feed it to several downstreams where each of them could fail etc. The only way to make that source re-streamable from the beginning, or streamable multiple times that are not in parallel is to buffer it completely in memory or on disk.

For the parallel case you should be able to use broadcast with the graph DSL, a tricky part there is that you will need a sink per downstream to connect the broadcast to, but you need a source for that sink to pass to the http request. There is no such Sink->Source operator out of the box, but there is a PR in progress adding that (https://github.com/akka/akka/pull/25150), so maybe you could take inspiration for that to achieve the same.

The BroadcastHub is something different, meant for when you have one upstream running for a longer while and downstreams with different livespans from the upstream coming and going, so I don’t think that will be useful for you.

Thanks for the pointer to the sinkToSource function. That solved the problem for me.

I used code like this:

  /**
   * Materializes into a sink connected to a source, i.e. the sink pushes into the source:
   *
   * +----------+       +----------+
   * >   Sink   |------>|  Source  >
   * +----------+       +----------+
   *
   * Should be provided by Akka Streams, see https://github.com/akka/akka/issues/24853.
   */
  def sinkToSource[M]: RunnableGraph[(Sink[M, NotUsed], Source[M, NotUsed])] =
    Source
      .asSubscriber[M]
      .toMat(Sink.asPublisher[M](fanout = false))(Keep.both)
      .mapMaterializedValue {
        case (sub, pub) ⇒ (Sink.fromSubscriber(sub), Source.fromPublisher(pub))
      }


...

    def remoteFlow(h: ServerInfo): (Future[Done], Flow[ByteString, ByteString, NotUsed]) = {
      val (sink, source) = sinkToSource[ByteString].run
      val f = Source
        .single(makeHttpRequest(streamName, h, source))
        .via(remoteConnections(h))
        .runWith(Sink.ignore)

      (f, Flow[ByteString].alsoTo(sink))
    }

...

  def makeHttpRequest(streamName: String, serverInfo: ServerInfo, source: Source[ByteString, Any]): HttpRequest = {
    HttpRequest(
      HttpMethods.POST,
      s"http://${serverInfo.host}:${serverInfo.port}$distRoute/$streamName",
      entity = HttpEntity(ContentTypes.`application/octet-stream`, source)
    )
  }


and then used the returned Flow in the broadcast graph. I found that I also had to wait for the future returned from the HTTP request to be sure that the upload was completed.

1 Like