Passing stream elements "into" AND "over" flow


(Roman Tkalenko) #1


I’m wondering if there is a default / less verbose way of doing this:

private def overleaping[A, B](flow: Flow[A, B, NotUsed]): Flow[A, (A, B), NotUsed] = {
  Flow.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
    import GraphDSL.Implicits._

    val bcast: UniformFanOutShape[A, A] = builder.add(Broadcast[A](2))
    val zip: FanInShape2[A, B, (A, B)] = builder.add(Zip[A, B]())

    bcast.out(0) ~> zip.in0
    bcast.out(1) ~> flow ~> zip.in1

    FlowShape.of(, zip.out)

Supposed to work like this:

val flow = Flow[Int].map(_ * 17)

  .map { x =>
    // print only multiples of 3
    if (x % 3 == 0) println(s"incoming: $x")
  .map { case (x, str) =>
    // print only result derived from multiples of 3:
    if (x % 3 == 0) println(s"result for $x: $str")

If there is any other approach I should consider for this, I’d be fine with that too.

(Gergő Törcsvári) #2

Just by reading this I got 2 things in my mind.

First: If the Flow is a simple map then you can do sth like

def overleaping[A, B](fn: A => B): Flow[A, (A, B), NotUsed] = Flow[A].map(x => (x, fn(x)))

Second: If the Flow is not a simple map (for example a mapConcat, a filter or something that can drop/generate elements), then you will get a huge deadlock. Read this or this for more info (btw the second link is mostly your case I think).

(Roman Tkalenko) #3

@tg44 real use-case is this: I have an akka-http app which is set up sth like this:

val routes: Route = ???

val fullFlow = Flow[HttpRequest]
  .map { req => 
    if (!isInternal(req)) logRequest(req)
  .map { resp =>
    // HERE
    // i want request as well as response
    // to make sure i don't log responses for internal requests
    // "internal request" in my case is e.g. a load balancer health check
    // which are happening pretty often so i don't want to clutter the logs
    // with useless data

as you can see, there is no dropping etc, so it’s pretty straightforward in this case.

The problem here is that Route is a RequestContext ⇒ Future[RouteResult], which is implicitly converted into Flow[HttpRequest, HttpResponse, NotUsed] by http internals.

In theory i could just reimplement it like this:

val fullFlow = Flow[HttpRequest]
  .map { req => 
    if (!isInternal(req)) logRequest(req)
  .mapAsync(1)(req => handle(req).map(resp => req -> resp))
  .map { case (req, resp) =>
     if (!isInternal(req)) logRequest(req)

where handle is replicating internal logic of converting Route into a HttpRequest => Future[HttpResponse], but I don’t like the idea of a) going on a lower level here and b) reimplementing what is already implemented in the internals.

Since Route is a [kind of] “classic” function (at least it’s total in some sense), I don’t see a possibility of anything weird happening, so it should be fine.

(Gergő Törcsvári) #4

Hmm. Thats sad… There were more talking about solving these kind of use-cases, but nothing implemented as I know…

I think there will be no better solution for this problem then, and the broadcast-zip will not be a bottleneck in this case.

(BTW if this is http you can signal the “internality” as “custom” headers too (every healthcheck request gets a specific header for ex), so you don’t need the reqest to solve the isItInternal? problem.)

(Roman Tkalenko) #5

isInternal is implemented exactly as a header check :slight_smile:

thanks for your answer.

(Martynas Mickevičius) #6

We have this pattern mentioned in the Alpakka docs as well:

We might want to package it up and make it accessible in a library like akka-stream-contrib. Let me know if that would be useful for you.

(Roman Tkalenko) #7

Oh, cool, makes sense in context of commottable messages as well. Thanks for this pointer! Definitely won’t hurt having this in contrib, I’ll probably open a P.R. with this.