Strange behavior. MergePrioritized doesn't pull new elements

Hi all,

I have a flow that looks like following:

          |----------------------------------------------------------------------------------|
          |  |--------------------------------------------------------------|                |
          |  |                                                              |                |
          |  |                                             |------------| --|  |---------| --|
          |  |-------->>>>> |-------|                      | partition1 |      | partit2 |
          |------------>>>> |       |     |----------| --> |------------| ---->|---------| -------->>>|-------|
    ----------------------> | merge0| --> | partit_0 | --------------------------------------------->>| merge1| -->
          |-------------->> |       |     |----------| --> |------------| ----->|-------------| ----->|-------|
          |  |---------->>> |-------|                      | partition3 |       | partition4  |
          |  |                                             |------------| --|   |-------------| --|
          |  |--------------------------------------------------------------|                     |
          |---------------------------------------------------------------------------------------|

Where all merge shapes are MergePrioritized and all partition shapes are Partition. All merge shapes have eagerComplete=false and all partitions have eagerCancel=false.

This flow just stuck all the time. I figured out that for particular run partit_0 partition emits 591 elements to partition1. Then partition function in partition1 redirects 46 elements to Outlet_0 and 545 elements to Outlet_1, however, partition itself emits only 45 elements to the Outlet_0. I suppose that merge0 stops pulling new elements and cannot understand why and how to fix it?
I would be very grateful for any recommendation.
Thank you in advance!

Cheers,
Sergii

Probably: https://doc.akka.io/docs/akka/2.5/stream/stream-graphs.html will helps.

The deadlock can occure when partit_0 backpressures and its choosen output is bp-ing too. So the element from part0 would go to part1. But the element from part1 want to go to the merge0. But merge0 want to choose the input from part3 (bcs it’s priority, and if multiple elements are ready it will choose depending on it’s inner state.)
I didn’t read the source of prioritized merge but I think something like this happening in your code. By the way; try to avoid this kind of cyclic graphs. Most of the time when you resolve your bp problems the next person who will touch the code will ruin it again :D

1 Like

Hi Gergo,

Thank you very much! You were right! I added buffers and it started working for some cases.
But now I have dilemma how to change the flow. Are there any best practices how to re-engineer cycles in flows?
Thanks.

Cheers,
Sergii

The buffers left your graph in an inconsistent state. I don’t advise you to use them.

What to use instead? It depends…

The best if you simply eliminate it with another approach. For ex. your graph has only partitions and mergers on this picture. If you have only maps beside those (for ex. on the flowback lanes) you can refactor the whole thing to a recursive function and a map. All the partitions can be translated to their inner ifs, and every map to their inner functions. If you have filters/repeaters too you can use mapconcat and return a list for all input elements. If you have async you lost this method go to the next one.

If you can’t eliminate it, you need to make it consistent! It could be hacky and suboptimal… For example because I know nearly nothing in this particular scope I would use keepAlive + filter combo. Keepalives before the merge, and filter after the merge. It will solve the deadlock issue you have right now. (But will slow down your graph instead of deadlock. If you know the scope and input characteristics this could work and much better then buffers.). But if you have any other alternative try to not do that.

If you brave enough and can locate your source of bp problem an another option is to write a custom stage which not have this problem. (Most of the time when I used cycles this was the actual problem solver.) The problem in this case the merger. Hacky alternative: write a resetable merger, and if one keepalive is kick in; reset the state in the prioritization logic. (This could solve the probably huge slowdown from the keepalive option.) Or “simply” write a merger for your concrete needs!

Ending thought: Just leave out the cycles if you can. :wink:

2 Likes

Hi Gergo,

Thank you very much for your advises! It seems I have found the solution that works for me. This flow has been executed via balancer, so it was async() and it means that a stage, after handing out an element to its downstream consumer is able to immediately process the next message. I just added inputBuffer to this flow and now it works, the only thing I need is to optimize is inputBuffer size.

Thank you very much!

Cheers,
Sergii