Flow.monitor (java)

https://doc.akka.io/japi/akka/current/akka/stream/javadsl/Flow.html#monitor-akka.japi.function.Function2-

I think this is the method that I need, but I find the documentation incomprehensible. Specifically what is the return value of the combine function expected to be?

Can someone please explain how to use it?

Mark

Hi Mark,
thanks for the question and reporting – the signature is needlessly complex indeed and we’ll fix that;
here’s the ticket about the signature: https://github.com/akka/akka/issues/24812

In general the signature exposed there is the “combine materialized values” version which you may sometimes need if there are multiple materialized values you want to keep. See these docs to learn about materialized values: Stream composition: Materialized Values

For now though, here is how to use it:

    Source<String, NotUsed> source = Source.single("anything");
    
    Source<String, FlowMonitor<String>> monitoredOnlyMonitor = 
        source.monitor(Keep.right()); // this should be the default, we'll fix the signatures
    
    //    // this you would do if the materialized value of the source would be of interest:
    //    Source<String, Pair<NotUsed, FlowMonitor<String>>> monitoredBoth = 
    //        source.monitor(Keep.both());

    FlowMonitor<String> monitor = monitoredOnlyMonitor.to(Sink.ignore()).run(materializer());

In other words, the parameter there tells which materialized value should be “kept”, in this case you want to keep the monitor of course, so Keep.right() as the monitor is on the “right side” (of the source).

Hope this helps and please monitor the above linked ticket for us fixing the signature issue.

What I am trying to do is to count the number/length of ByteString’s flowing and detect when the stream is finished or terminated. Specifically, I have an HttpResponse and wanted to pass a monitored flow to the transformEntityDataBytes method. Then, on completion, I could report the duration and the rate (bytes/second). In my initial attempt, the function was called just once with the FlowMonitorState of initialised.

I am still not sure how to achieve this (without implementing a complete new Flow).

Thanks for your help,
Mark