I need to collect a metric on how much time each event spent in a certain flow. The main use-case is to collect the time the ElasticsearchFlow spent writing each element.
I’ve had a look at the contrib module which has some timing-flow, but unfortunately not covering my use-case. Then I came accross this LatencyMonitor which is almost what I need, with the only exception that it is collecting averages instead of actual “response” times.
Furthermore I need to only monitor response times which have been written successfully (because only those are relevant for the metric histogram)
So I started my own custom flow which wraps another flow. Now I am facing two challenges:
- Flow-API: how to pass some TimerContext from a startFlow to the endFlow
- GraphStage: how to get the actual output-value from the wrapped flow in order to check the condition
Regarding 1: I first thought to just pass a Tuple2 including the Timer, but since I have no control over the types in the ElasticsearchFlow this doesnt work.
Regarding 2: just using the handlers onPush and onPull would not yield correct result since onPull is called once the sink signaled demand (which can be much later than when the wrapped flow was finished), and I havent found a way the get the out-value of the wrapped flow.
Another idea might be the use of the Graph-DSL, which splits and merges the flow, meaning that the TimerContext would move in paralell to the wrapped flow and the merge would do the logic. But this seems quite heavy for a “simple” timer.
Did I miss anything in the existing API? Is this even possible, or am I conceptually wrong here?