Parallel event processing by property value

Hi,

I have a ServiceA that publishes an event to topic-a. ServiceB subscribes that topic-a so that it can process those incoming events.

And I want ServiceB to handle multiple events at the same time, as long as those events are independent of each other.

For example, the event has this structure

{
    "id" : "123",
    "payload" : {...}
}

And then ServiceB, upon seeing this event, will get its internal PersistentEntityRef using the event’s “id” property value as its ref id.

Now, ServiceB should be handling the events coming in for id “123” sequentially. But it should also be able to handle events coming in (for example) for id “456” in parallel because the two dont affect each other.

What is the best way to achieve this?

We have a few ideas in mind, but we are not sure which is the best.

Option 0: Current code - mapAsync(1, aEvent -> return persistentEntityRef.ask(command))

serviceA.topicA()
          .subscribe()
          .atLeastOnce(Flow.<AEvent>create()
              .mapAsync(1, aEvent -> {
                // do processing here
                return persistentEntityRef.ask(command)
              }
          ));

This is our current code base. This works, but it can only process one event at a time. That is, given two events - Event1 (with id “123”) and Event2 (with id “456”), this code snippet processes Event1 first before processing Event2. What we prefer to happen is that Event1 is processed without blocking Event2.

Option 1: Always return Done.getInstance()

serviceA.topicA()
          .subscribe()
          .atLeastOnce(Flow.<AEvent>create()
              .mapAsync(1, aEvent -> {
                // do processing here
                return CompletableFuture.completedFuture(Done.getInstance());
              }
          ));

This will allow us to handle all events sequentially while never blocking the next incoming event. The problem is that we can potentially choke up if the event processing cant keep up with the incoming events.

Option 2: Flow.create().mapAsync(someBigParallelismNumber, aEvent -> {})

serviceA.topicA()
          .subscribe()
          .atLeastOnce(Flow.<AEvent>create()
              .mapAsync(100, aEvent -> {
                // do processing here
                return persistentEntityRef.ask(command);
              }
          ));

We think this will achieve something similar to Option 1 such that we we will not block incoming events but only up to a certain number (in the code above, up to 100 only). That way, we can limit things and not choke up and die. The problem though is that based on our understanding of the javadoc of mapAsync, is that although the output of the events would be maintained as they are emitted downstream, the processing of those events may not finish in the same sequence. That is, given incoming events Event1 and Event2, Event2 may finish first before Event1 (though the output of Event1 would be emitted first before the output of Event2). In our scenario, we want Event1 to be processed before Event2. (if we misunderstood how mapAsync() works, please do let me know).

Option 3: Multiple subscriptions

for (String knownId : knownIds) {
  serviceA.topicA()
          .subscribe()
          .atLeastOnce(Flow.<AEvent>create()
              .filter(aEvent -> Objects.equals(knownId, aEvent.id))
              .mapAsync(1, aEvent -> {
                // do processing here
                return persistentEntityRef.ask(command);
              }
          ));
}
serviceA.topicA()
        .subscribe()
        .atLeastOnce(Flow.<AEvent>create()
            .filter(aEvent -> !knownIds.contains(aEvent.id))
            .mapAsync(1, aEvent -> {
              // do processing here
              return persistentEntityRef.ask(command);
            }
        ));

This one creates multiple subscribers for the same topic. But each subscriber is just interested in a particular “id” property value. This way, the subscriber for id “123” can process all events for “123” sequentially without affecting the events for “456” (and vice-versa). Our concern with this one is that we are not even sure if this would work or if this would bog down the system because it’s creating too many subscribers.

Update
Option 4: Kafka partitions. Partitioned by AEvent::id

public interface ServicA {
  @Override
  default Descriptor descriptor() {
    return named(SERVICE_NAME)
        .withTopics(
            topic(TOPIC_NAME, this::topicA)
                .withProperty(KafkaProperties.partitionKeyStrategy(), aEvent::id),
        )
        .withAutoAcl(true);
  }
}
public interface ServicBUpstreamHandler {
...
      serviceA.topicA()
          .subscribe()
          .atLeastOnce(Flow.<AEvent>create()
              .mapAsync(1, aEvent -> {
                // do processing here
                return persistentEntityRef.ask(command)
              }
          ));
...
}

This would allow publishing of events partitioned by AEvent::id. What we are not sure of though is that how subscription will change given this partition strategy. Will the subscriber start processing events in parallel by partition, or will it still process events sequentially regardless of partitioning?

Thoughts?

@franz

When designing feature like this you need to start from Kafka functionalities.
Kafka uses partitions per topic to allow parallel processing of each partition with preserving order of the events per partition.
Parallel processing of partitioned topic is to have multiple subscriber instances in the same group where kafka is responsible for distribute each partition event stream per subscriber instance in the group.
Lagom by default sets the groupId as service name (defined in service descriptor) so when doing a subscribe all subscribers will have the same groupId.

In your case you should have event id as partition key.
I will comment each option taking in mind that event id is set as a kafka topic partition key.

  1. no parallelism if one instance is running. You could scale it horizontally by deploying multiple service instances. In my experience scaling horizontally only to have parallel processing of topic messages is an overhead.
  2. no back-pressure. Also no grantees that the order will be preserved. So no-no
  3. this will allow parallel processing of the events but will not preserve order. Been there done that :slight_smile:
  4. you would have to know exactly what ids you are processing. So it should be a static list or you should dynamically maintain the list of ids and subscribe/unsubscribe. There is also an overhead having all this subscriber instances because, i believe, you will not have a situation where there are always events per each partition. Also scaling number of subscribers is not controllable because it depends on number of partitions (if list of ids is maintained dynamically).
  5. this will work the same as #0

So for parallel processing of topic partitions you need multiple subscribers.
Having subscriber instance per partition to achieve max parallelism (as #3) is an overhead and scaling is not controllable so the solution is to have pre-configured count of subscribers.
This pre-configured count would then allow you a vertical sale (just restart the service with different count) and horizontal scale (scaling a service instance). Just a note that horizontal scale will exponentially increase number of subscribers.
Code would look something like this:

IntStream.range(0,count).forEach(i-> {
  serviceA.topicA()
          .subscribe()
          .atLeastOnce(Flow.<AEvent>create()
              .mapAsync(1, aEvent -> {
                // do processing here
                return persistentEntityRef.ask(command);
              }
          ));
});
1 Like

Thanks @aklikic! excellent response! :slight_smile:

So to verify my understanding on your recommended solution, I need to update the publishing side to partition by event id, and update the subscribing side to create multiple subscribers.

public interface ServicA {
  @Override
  default Descriptor descriptor() {
    return named(SERVICE_NAME)
        .withTopics(
            topic(TOPIC_NAME, this::topicA)
                .withProperty(KafkaProperties.partitionKeyStrategy(), aEvent::id),
        )
        .withAutoAcl(true);
  }
}
IntStream.range(0,count).forEach(i-> {
  serviceA.topicA()
          .subscribe()
          .atLeastOnce(Flow.<AEvent>create()
              .mapAsync(1, aEvent -> {
                // do processing here
                return persistentEntityRef.ask(command);
              }
          ));
});

@franz yes :slight_smile:

Excellent! thanks @aklikic!! :smiley:

(apologies if I’m stating the obvious)

Please note this will parallelise processing as long as the number of partitions is increased in the Kafka Brokers. It doesn’t matter if you have 100 subscribers and you partition per entity ID if the Kafka topic is created with only ONE partition. A Lagom subscriber is no different than a plain AkkaClient in that two subscribers will not consume the same partition at the same time.

1 Like

BTW, thanks @aklikic for sharing your expertise in a great answer!