Using Flow as Sink/Source

Hi All,

I’m having a bit of difficulty combining some akka-streams components. The goal is to be able to return a Sink[ByteString, _] that other components can use, but I’ve been unable to figure out how to expose the source and sink sides of a flow correctly. My code is roughly as follows:

def getOutbound(): Future[Sink[ByteString, _]] = {
  Sink.fromMaterializer { (mat, attr) =>
    Sink.futureSink {
      val flow = Flow[ByteString]
      val source = Source.fromGraph(flow) // this is incorrect

      val create = for {
        thing <- Marshal(source).to[RequestEntity] // need to provide Source[ByteString, _]
        // make http request
      } yield {
        Sink.fromGraph(flow) // this is incorrect
      }
    }
  }
}

Can anyone give me some pointers on the correct way to do this? My gut feel is that it shouldn’t require anything exotic, but what I really want to do is treat the HTTP request marshaller as a sink so that I can just say something like Flow[ByteString].to(myOutboundRequest).

Hi @BoopBoopBeepBoop,

could you describe what the semantics should be when you materialize the resulting stream?

At which point should a request be made? What would be the advantage of the API you are trying to build vs. calling a method where you pass the Source in that directly makes the request?

Johannes

I’ve gotten something working but it’s suboptimal, as I think it can fail without fully propagating errors right now.

override def write(): Future[(String, Sink[ByteString, Future[HttpResponse]])] = {
    val sink =
      // putting in materializer means stuff inside here won't happen until the stream is instantiated
      Sink.fromMaterializer { (_, _) =>
        
        val (queue, qSource) = Source.queue[ByteString](100, OverflowStrategy.fail).preMaterialize()
        val chunkedEntity = HttpEntity(ContentTypes.NoContentType, qSource)
        val uploadFuture = /* make Http request here */

        Sink
          .foreachAsync[ByteString](parallelism = 1) { byteString =>
            queue.offer(byteString).map(res => ())
          }.mapMaterializedValue { result =>
            // ensure that we pass signal to HTTP stream
            result.onComplete {
              case Success(_) => queue.complete()
              case Failure(exception) => queue.fail(exception)
            }
            uploadFuture
          }
      }

    Future.successful((url, sink.mapMaterializedValue(_.flatMap(identity))))
  }

I’ve removed a few bits around the HTTP request to keep the example clean. If I’ve understood this correctly, this returns a Sink whose behavior will not be triggered until the stream is instantiated - allowing us to only launch the request if the Sink is used. The internals of that sink create a queue source, and hand that to the Http entity. We have to do this (if I’m understanding correctly) because the lifecycle of the stream instantiation for this entity is owned by akka http. Then we can use a foreach Sink to pipe elements into the queue. I haven’t yet determined how to map failure of the http stream back to the wrapping Sink.

Am I understanding correctly that these need to be separate streams because they have different lifetimes? The wrapper Sink lifetime owned by whoever chooses to instantiate it, and the http Source lifetime instantiated as part of the http infrastructure?