Akka routing -- clusters

Hello,
I am running actors in a cluster. I was following an example, where each worker actor is started in a separate actor system. The actor then registers with a master actor that waits for the number of workers to reach a threshold and then starts distributing the work among the worker nodes.
In a different thread, I was advised to run all the actors within the same actor system. Therefor, I start an actor system for each node in my cluster and this works fine.

Now, I also want to use multiple routers. So I group the actors and each group will have a router. I am using ConsistentHashingGroup.
Currently, the master is creating the routers. However, it is slowing down the application. Using actors without routers finishes in around a minute. When I used routers (5 routees), the time to process the same size of data takes 35 minutes.

My question. I assumed that the cause of the latency when I use the router must be related to the actor system running the master actor becoming a bottleneck. Is it better to start an actor on each node in the cluster (and hence on each actor system)? If that is is the case, how I can make the router register so the master actor would know about it?
I have been trying to find more examples about routers and how to use them in a cluster, but I could not.

Regarding the scale of my current application (I am willing to scale out but seem to run in more problems when I try):
– 1 master actor
– 5 workers (when I use a router, then I multiply them by 5). workers are organized in a tree and a message received by any leaf (actor) is sent to other actors (ancestors) in that tree
– processed messages are more than 2 million, sending them to the worker actors that are leaves in the above hierarchy takes less than 20 seconds.

Thanks

That indeed sounds good.

Why?

So the work is still being distributed by 1 master actor in your cluster? Where does the ‘work’ come from?

That indeed seems suspicious.

Do you mean the leaf actors do the work and then send the result of the work to their ancestor? Where does the result of the work eventually go?

Thanks Arnout for your reply.
I think I better give a brief overview of what I am doing. In my application, I have a tree, where each node in that tree maintains some data and do processing on this data. Initially, I had a centralized version of this application, in which updates are streamed and each node in the tree updates its data. However, I wanted to simultaneously process multiple updates and also to divide the processing at each node. So the design I used for this scaling out was to create one actor per node and have the node communicate together via messaging. To achieve the second objective, which is divide the processing at each node, I am using routers.

raboof
Arnout Engelen
Akka Team

    May 1

ielghand:
I start an actor system for each node in my cluster and this works fine.

That indeed sounds good.

Now, I also want to use multiple routers.

Why?

Because each group of actors are doing a different function. I am using a router for each group.

Currently, the master is creating the routers.

So the work is still being distributed by 1 master actor in your cluster? Where does the ‘work’ come from?

Yes, there is only one master. For now I am reading the work from a file. However, the plan is to stream it through, for example, kafka.

Using actors without routers finishes in around a minute. When I used routers (5 routees), the time to process the same size of data takes 35 minutes.

That indeed seems suspicious.

:( I am trying to debug this and I was not sure if it is a configuration issue or what.

5 workers (when I use a router, then I multiply them by 5). workers are organized in a tree and a message received by any leaf (actor) is sent to other actors (ancestors) in that tree

Do you mean the leaf actors do the work and then send the result of the work to their ancestor? Where does the result of the work eventually go?

Yes, as described in the overview of the application above, updating the tree starts at the leaf nodes and is propagated to other nodes until the root is reached. Now that each node in the tree is an actor, so actors playing the role of the leaf nodes in the tree are the ones that receive messages from the master and they send messages to their parents.

There is no output generated for each update, my goal is to keep the data in the tree updated. When I want to generate an o/p I compile it from the data stored at each node (actor).

Thanks

Wait, by ‘node’ do you mean one point in your tree or one JVM in your cluster?

For each piece of work, do you know in advance in which subtree it will fall, or do you only know the address of the specific tree node? If you know the full ‘path’, a solution might be to use Cluster Sharding to shard the tree nodes in the ‘top layer’ of your tree, and then have each complete subtree live on the same cluster node.

Hmm, is a ‘group’ of actors also a ‘subtree’ of your tree? I feel like I’m still not getting a clear picture of your system.

raboof
Arnout Engelen
Akka Team

    May 1

ielghand:
I have a tree, where each node in that tree maintains some data and do processing on this data. (…) I wanted to simultaneously process multiple updates and also to divide the processing at each node. So the design I used for this scaling out was to create one actor per node and have the node communicate together via messaging.

Wait, by ‘node’ do you mean one point in your tree or one JVM in your cluster?

Node in the above description is a point in the tree. In the design I map this point in the tree to an actor or a set of actors if I am using a router.

For each piece of work, do you know in advance in which subtree it will fall, or do you only know the address of the specific tree node? If you know the full ‘path’, a solution might be to use Cluster Sharding to shard the tree nodes in the ‘top layer’ of your tree, and then have each complete subtree live on the same cluster node.

Yes, I keep track of all the created actors and their mapping to the tree points. When an update (piece of work) is received (or read from the file), I know which point of the tree should be updated, and therefore, can send a message to the actor representing this point of the tree.

I have not looked into the details of cluster sharding, I will consider it.

each group of actors are doing a different function. I am using a router for each group.

Hmm, is a ‘group’ of actors also a ‘subtree’ of your tree? I feel like I’m still not getting a clear picture of your system.

Yes, each group of actors are maintaining a subtree of the tree (for simplicity, I am assuming a subtree is a point/node in the tree).

so my application is based on a tree. For example:

x

y z

w v

  • In the simplest form, I create 5 actors and assign to each one of them the responsibility of maintaining the data stored in one of the tree points, namely x,y,z,w,v. So I will have an actor for x, an actor for y, ….

  • When I want to scale out, I would use a router for each point in that tree. In this case 5 routers, and assign routees to each router. I use hash partitioning to divide the data at each point of the tree to make the actors assigned to one router act independently from each other. In this solution, I will have a router for x, a router for y, ……

  • When my update work starts at w, then I create a message and send it to the actor representing w (or a router in the solution with routers). Actor w finishes updating its data, and send a message to actor y, which also finishes updating its data and send a message to x. When x finishes the update of its locally stored data, then this piece of work is assumed to be completed.

  • Each actor representing a point in the tree knows the ActorRefs of its parent and its children.

To clarify my question, I have created this simple example to show the overhead introduced by the routers that I create: https://github.com/ami07/AKKARoutersExample
My question, what am I doing wrong in the configuration that is causing this huge overhead by the routers? When I also use routing in cluster , it seems that there is a good number of lost messages.

The example read “what is supposed to be” a stream of tuples from a file, and divide them into three sets and send tuples in each set to an actor (tuple at a time). When I am using routers, then a tuple will be hashed based on a key in the tuple to the corresponding worker actor.
The three sets are not equal, the number of tuples in each are as follows: 2,000,497, 266,664, and 3,333

I am using the same worker actor and comparing three cases:

  • running all the worker actors locally - no routers, therefore, I only create three actors each will be responsible for a set in the data.
  • creating workers locally as a pool for router. I have three routers each is responsible for a set in the data.
  • clustering: a master node creates 3 routers and assigns a group of created actors to it. I have tried to run the master and all the actors on the same machine and create the master on a separate machine.

The summary of the results (machine 8 cores/16 threads Intel Xeon E5520 @ 2.27GHz processor, 24GB RAM). I am reporting time to iterate through lines of the file and send messages to actors/routers, time for finishing the processing of messages of largest set, middle set, and smallest set

  • Local - no routing: 13463 ms, 18137 ms, 6010 ms, 3325 ms
  • Local - routing pool: 18116 ms, 21114 ms, 7708 ms, 3392 ms
  • Cluster — workers and master on same machine: 32795 ms, 231323 ms, 74691 ms, 6625 ms
  • Cluster — workers run on a different machine: 18316 ms, 215817 ms, 73254 ms, 5842 ms

Details of the results:
local - no routing:
reading the file and sending messages to actors took 13463 ms
[akka://AKKANoRouterLocal/user/actorStoreS] S :finished processing 3333 taking 3325 ms , throughput so far: 1111.0 (msg/sec)
[akka://AKKANoRouterLocal/user/actorStorePS] PS :finished processing 266664 taking 6010 ms , throughput so far: 44444.0 (msg/sec)
[akka://AKKANoRouterLocal/user/actorStoreL] L :finished processing 2000497 taking 18137 ms , throughput so far: 111138.72 (msg/sec)

Local - routing pool
reading the file and sending messages to actors took 18116 ms
[akka://AKKARouterLocal/user/simpleHashPoolRouterL/$a] L :finished processing 215442 taking 21114 ms , throughput so far: 10259.143 (msg/sec)
[akka://AKKARouterLocal/user/simpleHashPoolRouterL/$c] L :finished processing 592752 taking 21112 ms , throughput so far: 28226.285 (msg/sec)
[akka://AKKARouterLocal/user/simpleHashPoolRouterL/$d] L :finished processing 314943 taking 21112 ms , throughput so far: 14997.286 (msg/sec)
[akka://AKKARouterLocal/user/simpleHashPoolRouterL/$b] L :finished processing 366837 taking 21114 ms , throughput so far: 17468.428 (msg/sec)
[akka://AKKARouterLocal/user/simpleHashPoolRouterL/$e] L :finished processing 510523 taking 21111 ms , throughput so far: 24310.62 (msg/sec)

[akka://AKKARouterLocal/user/simpleHashPoolRouterPS/$a] PS :finished processing 62961 taking 7708 ms , throughput so far: 8994.429 (msg/sec)
[akka://AKKARouterLocal/user/simpleHashPoolRouterPS/$e] PS :finished processing 14264 taking 7707 ms , throughput so far: 2037.7142 (msg/sec)
[akka://AKKARouterLocal/user/simpleHashPoolRouterPS/$c] PS :finished processing 55902 taking 7708 ms , throughput so far: 7986.0 (msg/sec)
[akka://AKKARouterLocal/user/simpleHashPoolRouterPS/$b] PS :finished processing 40749 taking 7708 ms , throughput so far: 5821.2856 (msg/sec)
[akka://AKKARouterLocal/user/simpleHashPoolRouterPS/$d] PS :finished processing 92788 taking 7708 ms , throughput so far: 13255.429 (msg/sec)

[akka://AKKARouterLocal/user/simpleHashPoolRouterS/$c] S :finished processing 1327 taking 3390 ms , throughput so far: 442.33334 (msg/sec)
[akka://AKKARouterLocal/user/simpleHashPoolRouterS/$d] S :finished processing 621 taking 3390 ms , throughput so far: 207.0 (msg/sec)
[akka://AKKARouterLocal/user/simpleHashPoolRouterS/$e] S :finished processing 300 taking 3388 ms , throughput so far: 100.0 (msg/sec)
[akka://AKKARouterLocal/user/simpleHashPoolRouterS/$a] S :finished processing 798 taking 3392 ms , throughput so far: 266.0 (msg/sec)
[akka://AKKARouterLocal/user/simpleHashPoolRouterS/$b] S :finished processing 287 taking 3390 ms , throughput so far: 95.666664 (msg/sec)

Cluster — workers and master on same machine
reading the file and sending messages to actors took 32795 ms
[akka://AKKARouterCluster@127.0.0.1:2555/user/worker:0] L :finished processing 370196 taking 231323 ms , throughput so far: 1602.5801 (msg/sec)
[akka://AKKARouterCluster@127.0.0.1:2555/user/worker:14] L :finished processing 416094 taking 231288 ms , throughput so far: 1801.2727 (msg/sec)
[akka://AKKARouterCluster@127.0.0.1:2555/user/worker:4] L :finished processing 230499 taking 231302 ms , throughput so far: 997.8312 (msg/sec)
[akka://AKKARouterCluster@127.0.0.1:2555/user/worker:5] L :finished processing 494563 taking 231291 ms , throughput so far: 2140.9653 (msg/sec)
[akka://AKKARouterCluster@127.0.0.1:2555/user/worker:6] L :finished processing 489045 taking 231301 ms , throughput so far: 2117.078 (msg/sec)
>>>> Total messages processed by L: 1505834 (should be: 2000497)
>>>> avg time: 231301 ms ~ 3.85 min

[akka://AKKARouterCluster@127.0.0.1:2555/user/worker:10] PS :finished processing 42296 taking 74690 ms , throughput so far: 571.56757 (msg/sec)
[akka://AKKARouterCluster@127.0.0.1:2555/user/worker:13] PS :finished processing 63949 taking 74690 ms , throughput so far: 864.17566 (msg/sec)
[akka://AKKARouterCluster@127.0.0.1:2555/user/worker:7] PS :finished processing 68325 taking 74691 ms , throughput so far: 923.3108 (msg/sec)
[akka://AKKARouterCluster@127.0.0.1:2555/user/worker:1] PS :finished processing 38353 taking 74721 ms , throughput so far: 518.2838 (msg/sec)
[akka://AKKARouterCluster@127.0.0.1:2555/user/worker:8] PS :finished processing 53629 taking 74691 ms , throughput so far: 724.7162 (msg/sec)
>>>> Total messages processed by PS: 266552 (should be 266664)
>>>> avg time: 74696.6 ms ~ 1.24 min

[akka://AKKARouterCluster@127.0.0.1:2555/user/worker:2] S :finished processing 560 taking 6624 ms , throughput so far: 93.333336 (msg/sec)
[akka://AKKARouterCluster@127.0.0.1:2555/user/worker:3] S :finished processing 837 taking 6625 ms , throughput so far: 139.5 (msg/sec)
[akka://AKKARouterCluster@127.0.0.1:2555/user/worker:11] S :finished processing 566 taking 6607 ms , throughput so far: 94.333336 (msg/sec)
[akka://AKKARouterCluster@127.0.0.1:2555/user/worker:12] S :finished processing 762 taking 6609 ms , throughput so far: 127.0 (msg/sec)
[akka://AKKARouterCluster@127.0.0.1:2555/user/worker:9] S :finished processing 608 taking 6606 ms , throughput so far: 101.333336 (msg/sec)
>>>> Total messages processed by S: 3333 (ok)
>>>> avg time: 6.6 s

Cluster — workers run on a different machine

reading the file and sending messages to actors took 18316 ms
[akka://AKKARouterCluster@10.55.0.153:2555/user/worker:13] L :finished processing 350960 taking 215796 ms , throughput so far: 1632.3721 (msg/sec)
[akka://AKKARouterCluster@10.55.0.153:2555/user/worker:2] L :finished processing 275857 taking 215815 ms , throughput so far: 1283.0558 (msg/sec)
[akka://AKKARouterCluster@10.55.0.153:2555/user/worker:9] L :finished processing 380401 taking 215808 ms , throughput so far: 1769.307 (msg/sec)
[akka://AKKARouterCluster@10.55.0.153:2555/user/worker:1] L :finished processing 486638 taking 215817 ms , throughput so far: 2263.4326 (msg/sec)
[akka://AKKARouterCluster@10.55.0.153:2555/user/worker:3] L :finished processing 506546 taking 215816 ms , throughput so far: 2356.0278 (msg/sec)
>>>> Total messages processed by L: 2000402 (should be: 2000497)
>>>> avg time: 215808 ms ~ 3.6 min

[akka://AKKARouterCluster@10.55.0.153:2555/user/worker:5] PS :finished processing 45536 taking 73251 ms , throughput so far: 623.7808 (msg/sec)

[akka://AKKARouterCluster@10.55.0.153:2555/user/worker:8] PS :finished processing 36714 taking 73245 ms , throughput so far: 502.93152 (msg/sec)
[akka://AKKARouterCluster@10.55.0.153:2555/user/worker:0] PS :finished processing 86270 taking 73254 ms , throughput so far: 1181.7808 (msg/sec)
[akka://AKKARouterCluster@10.55.0.153:2555/user/worker:11] PS :finished processing 30086 taking 73233 ms , throughput so far: 412.137 (msg/sec)
[akka://AKKARouterCluster@10.55.0.153:2555/user/worker:4] PS :finished processing 67942 taking 73253 ms , throughput so far: 930.71234 (msg/sec)
>>>> Total messages processed by PS: 266548 (should be 266664)
>>>> avg time: 73247.2 ms ~ 1.22 min

[akka://AKKARouterCluster@10.55.0.153:2555/user/worker:12] S :finished processing 575 taking 5829 ms , throughput so far: 115.0 (msg/sec)
[akka://AKKARouterCluster@10.55.0.153:2555/user/worker:14] S :finished processing 767 taking 5829 ms , throughput so far: 153.4 (msg/sec)
[akka://AKKARouterCluster@10.55.0.153:2555/user/worker:6] S :finished processing 678 taking 5842 ms , throughput so far: 135.6 (msg/sec)
[akka://AKKARouterCluster@10.55.0.153:2555/user/worker:7] S :finished processing 629 taking 5842 ms , throughput so far: 125.8 (msg/sec)
[akka://AKKARouterCluster@10.55.0.153:2555/user/worker:10] S :finished processing 684 taking 5831 ms , throughput so far: 136.8 (msg/sec)
>>>> Total messages processed by S: 3333 (ok)
>>>> avg time: 5.8 s

Thanks

I am still confused about the proper setup of routers and how is the best way to do it. I would appreciate any pointers regarding what I am doing wrong and a good example to follow.
Thanks