Combine MergeHub with MergePrioritized

Hi there,

I am trying to create the following graph:

 *                                  +--------+
 *                        +-------->| Prio 1 |+           +-----------+
 *                        |         +--------+|---------->| MergeHub1 |---------+
 *                        |          +--------+           +-----------+         |
 *                        |                                                     |
 *               +------------+     +--------+                                  V
 *         ----->| Partitioner|+--->| Prio 2 |+           +-----------+      +------------------+
 *               +------------+|    +--------+|---------->| MergeHub2 |----->| MergePrioritized |
 *                +------------+     +--------+           +-----------+      +------------------+
 *                         |                                                     ^    |
 *                         |         +--------+                                  |    |
 *                         +-------->| Prio 3 |+           +-----------+         |    |
 *                                   +--------+|---------->| MergeHub3 |---------+    |
 *                                    +--------+           +-----------+              |
 *                                                                                    V
 *                                                                           +---------------+
 *                                                                           | Service Sink |
 *                                                                           +---------------+

The goal of this graph is to support a sink that processes incoming requests from a dynamic set of sources while taking the priority of each request into account. So high priority requests should be executed with a higher chance than lower priority requests.

So if for instance, I create the mergePrioritized with priorities (6,3,1) and connect three sources (each one producing elements with one priority) to this graph, I would expect the source with the highest prio elements to be consumed 3 times faster than the second prio source and 6 times fast than the low prio source.

In practice, it turns out (I use Akka streams 2.5.31) that this only sometimes behaves as I want. After a lot of trial and error, I figured out that it behaves as it should only if I throttle the service sink. If I don’t throttle the service sink, there is little to no effect of the priorities.

My gut feeling about this is that this is caused by the internal implementation of the MergeHub combined with MergePrioritized that makes it so that if the service sink consumes too fast, the MergeHubs just don’t have made their values available to the merge prioritized. In other words, the sink consumes quickly all elements from the highest prio, then switches to the medium and then to the lowest before giving the high prio mergehub the chance to provide more values. This is counter intuitive because in my case my source flows produce instantly (Source.range(…).map(…)).

Is there a proper way to handle this problem without artificially slowing down Service Sink? For now, I just put a throttle(1, Duration.ofMillis(10)) and that seems to work but these values are very ad hoc and could be very env dependent.

thanks,

Bert