Hi,
I have a ServiceA that publishes an event to topic-a. ServiceB subscribes that topic-a so that it can process those incoming events.
And I want ServiceB to handle multiple events at the same time, as long as those events are independent of each other.
For example, the event has this structure
{
"id" : "123",
"payload" : {...}
}
And then ServiceB, upon seeing this event, will get its internal PersistentEntityRef using the event’s “id” property value as its ref id.
Now, ServiceB should be handling the events coming in for id “123” sequentially. But it should also be able to handle events coming in (for example) for id “456” in parallel because the two dont affect each other.
What is the best way to achieve this?
We have a few ideas in mind, but we are not sure which is the best.
Option 0: Current code - mapAsync(1, aEvent -> return persistentEntityRef.ask(command))
serviceA.topicA()
.subscribe()
.atLeastOnce(Flow.<AEvent>create()
.mapAsync(1, aEvent -> {
// do processing here
return persistentEntityRef.ask(command)
}
));
This is our current code base. This works, but it can only process one event at a time. That is, given two events - Event1 (with id “123”) and Event2 (with id “456”), this code snippet processes Event1 first before processing Event2. What we prefer to happen is that Event1 is processed without blocking Event2.
Option 1: Always return Done.getInstance()
serviceA.topicA()
.subscribe()
.atLeastOnce(Flow.<AEvent>create()
.mapAsync(1, aEvent -> {
// do processing here
return CompletableFuture.completedFuture(Done.getInstance());
}
));
This will allow us to handle all events sequentially while never blocking the next incoming event. The problem is that we can potentially choke up if the event processing cant keep up with the incoming events.
Option 2: Flow.create().mapAsync(someBigParallelismNumber, aEvent -> {})
serviceA.topicA()
.subscribe()
.atLeastOnce(Flow.<AEvent>create()
.mapAsync(100, aEvent -> {
// do processing here
return persistentEntityRef.ask(command);
}
));
We think this will achieve something similar to Option 1 such that we we will not block incoming events but only up to a certain number (in the code above, up to 100 only). That way, we can limit things and not choke up and die. The problem though is that based on our understanding of the javadoc of mapAsync, is that although the output of the events would be maintained as they are emitted downstream, the processing of those events may not finish in the same sequence. That is, given incoming events Event1 and Event2, Event2 may finish first before Event1 (though the output of Event1 would be emitted first before the output of Event2). In our scenario, we want Event1 to be processed before Event2. (if we misunderstood how mapAsync() works, please do let me know).
Option 3: Multiple subscriptions
for (String knownId : knownIds) {
serviceA.topicA()
.subscribe()
.atLeastOnce(Flow.<AEvent>create()
.filter(aEvent -> Objects.equals(knownId, aEvent.id))
.mapAsync(1, aEvent -> {
// do processing here
return persistentEntityRef.ask(command);
}
));
}
serviceA.topicA()
.subscribe()
.atLeastOnce(Flow.<AEvent>create()
.filter(aEvent -> !knownIds.contains(aEvent.id))
.mapAsync(1, aEvent -> {
// do processing here
return persistentEntityRef.ask(command);
}
));
This one creates multiple subscribers for the same topic. But each subscriber is just interested in a particular “id” property value. This way, the subscriber for id “123” can process all events for “123” sequentially without affecting the events for “456” (and vice-versa). Our concern with this one is that we are not even sure if this would work or if this would bog down the system because it’s creating too many subscribers.
Update
Option 4: Kafka partitions. Partitioned by AEvent::id
public interface ServicA {
@Override
default Descriptor descriptor() {
return named(SERVICE_NAME)
.withTopics(
topic(TOPIC_NAME, this::topicA)
.withProperty(KafkaProperties.partitionKeyStrategy(), aEvent::id),
)
.withAutoAcl(true);
}
}
public interface ServicBUpstreamHandler {
...
serviceA.topicA()
.subscribe()
.atLeastOnce(Flow.<AEvent>create()
.mapAsync(1, aEvent -> {
// do processing here
return persistentEntityRef.ask(command)
}
));
...
}
This would allow publishing of events partitioned by AEvent::id. What we are not sure of though is that how subscription will change given this partition strategy. Will the subscriber start processing events in parallel by partition, or will it still process events sequentially regardless of partitioning?
Thoughts?