Passing stream elements "into" AND "over" flow

Hi!

I’m wondering if there is a default / less verbose way of doing this:

private def overleaping[A, B](flow: Flow[A, B, NotUsed]): Flow[A, (A, B), NotUsed] = {
  Flow.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
    import GraphDSL.Implicits._

    val bcast: UniformFanOutShape[A, A] = builder.add(Broadcast[A](2))
    val zip: FanInShape2[A, B, (A, B)] = builder.add(Zip[A, B]())

    bcast.out(0) ~> zip.in0
    bcast.out(1) ~> flow ~> zip.in1

    FlowShape.of(bcast.in, zip.out)
  })
}

Supposed to work like this:

val flow = Flow[Int].map(_ * 17)

Source(List(1,2,3,4))
  .map { x =>
    // print only multiples of 3
    if (x % 3 == 0) println(s"incoming: $x")
    x
  }
  .via(overleaping(flow))
  .map { case (x, str) =>
    // print only result derived from multiples of 3:
    if (x % 3 == 0) println(s"result for $x: $str")
    str
  }
  .to(Sink.seq)

If there is any other approach I should consider for this, I’d be fine with that too.

Just by reading this I got 2 things in my mind.

First: If the Flow is a simple map then you can do sth like

def overleaping[A, B](fn: A => B): Flow[A, (A, B), NotUsed] = Flow[A].map(x => (x, fn(x)))

Second: If the Flow is not a simple map (for example a mapConcat, a filter or something that can drop/generate elements), then you will get a huge deadlock. Read this or this for more info (btw the second link is mostly your case I think).

@tg44 real use-case is this: I have an akka-http app which is set up sth like this:

val routes: Route = ???

val fullFlow = Flow[HttpRequest]
  .map { req => 
    if (!isInternal(req)) logRequest(req)
    req
  }
  .via(routes)
  .map { resp =>
    // HERE
    // i want request as well as response
    // to make sure i don't log responses for internal requests
    // "internal request" in my case is e.g. a load balancer health check
    // which are happening pretty often so i don't want to clutter the logs
    // with useless data
  }

as you can see, there is no dropping etc, so it’s pretty straightforward in this case.

The problem here is that Route is a RequestContext ⇒ Future[RouteResult], which is implicitly converted into Flow[HttpRequest, HttpResponse, NotUsed] by http internals.

In theory i could just reimplement it like this:

val fullFlow = Flow[HttpRequest]
  .map { req => 
    if (!isInternal(req)) logRequest(req)
    req
  }
  .mapAsync(1)(req => handle(req).map(resp => req -> resp))
  .map { case (req, resp) =>
     if (!isInternal(req)) logRequest(req)
    resp
  }

where handle is replicating internal logic of converting Route into a HttpRequest => Future[HttpResponse], but I don’t like the idea of a) going on a lower level here and b) reimplementing what is already implemented in the internals.

Since Route is a [kind of] “classic” function (at least it’s total in some sense), I don’t see a possibility of anything weird happening, so it should be fine.

Hmm. Thats sad… There were more talking about solving these kind of use-cases, but nothing implemented as I know…

I think there will be no better solution for this problem then, and the broadcast-zip will not be a bottleneck in this case.

(BTW if this is http you can signal the “internality” as “custom” headers too (every healthcheck request gets a specific header for ex), so you don’t need the reqest to solve the isItInternal? problem.)

isInternal is implemented exactly as a header check :slight_smile:

thanks for your answer.

We have this pattern mentioned in the Alpakka docs as well: https://developer.lightbend.com/docs/alpakka/current/patterns.html#passthrough

We might want to package it up and make it accessible in a library like akka-stream-contrib. Let me know if that would be useful for you.

Oh, cool, makes sense in context of commottable messages as well. Thanks for this pointer! Definitely won’t hurt having this in contrib, I’ll probably open a P.R. with this.

1 Like