Ideas on how to implement a latency monitor which wraps a flow

Hello everyone

I need to collect a metric on how much time each event spent in a certain flow. The main use-case is to collect the time the ElasticsearchFlow spent writing each element.

I’ve had a look at the contrib module which has some timing-flow, but unfortunately not covering my use-case. Then I came accross this LatencyMonitor which is almost what I need, with the only exception that it is collecting averages instead of actual “response” times.

Furthermore I need to only monitor response times which have been written successfully (because only those are relevant for the metric histogram)

So I started my own custom flow which wraps another flow. Now I am facing two challenges:

  1. Flow-API: how to pass some TimerContext from a startFlow to the endFlow
  2. GraphStage: how to get the actual output-value from the wrapped flow in order to check the condition

Regarding 1: I first thought to just pass a Tuple2 including the Timer, but since I have no control over the types in the ElasticsearchFlow this doesnt work.

Regarding 2: just using the handlers onPush and onPull would not yield correct result since onPull is called once the sink signaled demand (which can be much later than when the wrapped flow was finished), and I havent found a way the get the out-value of the wrapped flow.

Another idea might be the use of the Graph-DSL, which splits and merges the flow, meaning that the TimerContext would move in paralell to the wrapped flow and the merge would do the logic. But this seems quite heavy for a “simple” timer.

Did I miss anything in the existing API? Is this even possible, or am I conceptually wrong here?

After some more thoughts I will try to use the GraphDSL

x: the element flowing through the stream
t: started timer 
                 TimerStartShape (t out)                 -> (t in) TimerEndShape 
source -> (x in) TimerStartShape (x out) -> wrapped-flow -> (x in) TimerEndShape (x out) -> sink

TimerEndShape will get some function which gets the elapsed time, or an additional Outlet.

There are some problems with this approach as well, e.g. the wrapped flow cannot drop any elements nor change the order.

I guess you are looking for something inside your application, but just FYI our commercial Lightbend Telemetry provides that and more for streams https://developer.lightbend.com/docs/telemetry/current/instrumentations/akka-streams/akka-streams.html

I was able to finish my implementation, though I am not 100% certain if my logic will handle backpressure correct. I am willing to contribute my solution to the Akka Contrib module if there is any interest in this by the community. At the moment I only support a Scala API and would need to add the Java counterpart.

At the moment the usage looks like this

import com.stuff.akka.streams.Implicits._

Elasticsearch.prepareElasticsearchFlow
  .measureLatency { result =>
    if (result.outcome.success) {
	... push result to metrics backend with "result.measuredTime"
    }
  }
  .map ....continue with result from wrapped flow (Elasticsearch result)

in addition to the timer, the result also contains the wrapped flow result in order to add additional logic