Passing stream elements "into" AND "over" flow

streams

(Roman Tkalenko) #1

Hi!

I’m wondering if there is a default / less verbose way of doing this:

private def overleaping[A, B](flow: Flow[A, B, NotUsed]): Flow[A, (A, B), NotUsed] = {
  Flow.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
    import GraphDSL.Implicits._

    val bcast: UniformFanOutShape[A, A] = builder.add(Broadcast[A](2))
    val zip: FanInShape2[A, B, (A, B)] = builder.add(Zip[A, B]())

    bcast.out(0) ~> zip.in0
    bcast.out(1) ~> flow ~> zip.in1

    FlowShape.of(bcast.in, zip.out)
  })
}

Supposed to work like this:

val flow = Flow[Int].map(_ * 17)

Source(List(1,2,3,4))
  .map { x =>
    // print only multiples of 3
    if (x % 3 == 0) println(s"incoming: $x")
    x
  }
  .via(overleaping(flow))
  .map { case (x, str) =>
    // print only result derived from multiples of 3:
    if (x % 3 == 0) println(s"result for $x: $str")
    str
  }
  .to(Sink.seq)

If there is any other approach I should consider for this, I’d be fine with that too.


(Gergő Törcsvári) #2

Just by reading this I got 2 things in my mind.

First: If the Flow is a simple map then you can do sth like

def overleaping[A, B](fn: A => B): Flow[A, (A, B), NotUsed] = Flow[A].map(x => (x, fn(x)))

Second: If the Flow is not a simple map (for example a mapConcat, a filter or something that can drop/generate elements), then you will get a huge deadlock. Read this or this for more info (btw the second link is mostly your case I think).


(Roman Tkalenko) #3

@tg44 real use-case is this: I have an akka-http app which is set up sth like this:

val routes: Route = ???

val fullFlow = Flow[HttpRequest]
  .map { req => 
    if (!isInternal(req)) logRequest(req)
    req
  }
  .via(routes)
  .map { resp =>
    // HERE
    // i want request as well as response
    // to make sure i don't log responses for internal requests
    // "internal request" in my case is e.g. a load balancer health check
    // which are happening pretty often so i don't want to clutter the logs
    // with useless data
  }

as you can see, there is no dropping etc, so it’s pretty straightforward in this case.

The problem here is that Route is a RequestContext ⇒ Future[RouteResult], which is implicitly converted into Flow[HttpRequest, HttpResponse, NotUsed] by http internals.

In theory i could just reimplement it like this:

val fullFlow = Flow[HttpRequest]
  .map { req => 
    if (!isInternal(req)) logRequest(req)
    req
  }
  .mapAsync(1)(req => handle(req).map(resp => req -> resp))
  .map { case (req, resp) =>
     if (!isInternal(req)) logRequest(req)
    resp
  }

where handle is replicating internal logic of converting Route into a HttpRequest => Future[HttpResponse], but I don’t like the idea of a) going on a lower level here and b) reimplementing what is already implemented in the internals.

Since Route is a [kind of] “classic” function (at least it’s total in some sense), I don’t see a possibility of anything weird happening, so it should be fine.


(Gergő Törcsvári) #4

Hmm. Thats sad… There were more talking about solving these kind of use-cases, but nothing implemented as I know…

I think there will be no better solution for this problem then, and the broadcast-zip will not be a bottleneck in this case.

(BTW if this is http you can signal the “internality” as “custom” headers too (every healthcheck request gets a specific header for ex), so you don’t need the reqest to solve the isItInternal? problem.)


(Roman Tkalenko) #5

isInternal is implemented exactly as a header check :slight_smile:

thanks for your answer.


(Martynas Mickevičius) #6

We have this pattern mentioned in the Alpakka docs as well: https://developer.lightbend.com/docs/alpakka/current/patterns.html#passthrough

We might want to package it up and make it accessible in a library like akka-stream-contrib. Let me know if that would be useful for you.


(Roman Tkalenko) #7

Oh, cool, makes sense in context of commottable messages as well. Thanks for this pointer! Definitely won’t hurt having this in contrib, I’ll probably open a P.R. with this.