Akka Projection - How many ShardedDaemons? Best practices?

Hello!

Currently we only have one aggregate in our system (call it aggregate A), and one guardian actor. Everything works there. We want to add another Aggregate (call it aggregate B) and a projection that handles events from AggregateB. Note we are using Akka persistence with CQRS here.

My question is should we initialize a new ShardedDaemonProcess for the new projection? If so, should they use different processors? As an aside, should my aggregates (and thus my projections) run off of different Guardian actors and seed nodes? I’ve attached my proposed code for AggregateBProjection, I’m wondering if it breaks down anywhere, as I’m a little confused.

Aggregate A Projection

public class AggregateAProjection {
  public static void init(ActorSystem<?> system) {
    if (Cluster.get(system).selfMember().hasRole("read-model")) {
      ClusterShardingSettings shardingSettings = ClusterShardingSettings.create(system);
      ShardedDaemonProcessSettings shardedDaemonProcessSettings =
          ShardedDaemonProcessSettings.create(system)
              .withShardingSettings(shardingSettings.withRole("read-model"));

      // Allows us to run N actors, where N = parallelism
      ShardedDaemonProcess.get(system).init(
          ProjectionBehavior.Command.class,
          "AggregateAProcessors",
          4,
          index -> ProjectionBehavior.create(createProjectionFor(system, index)),
          shardedDaemonProcessSettings,
          Optional.of(ProjectionBehavior.stopMessage())
      );
    }

  static AtLeastOnceCassandraProjection<EventEnvelope<Message>> createProjectionFor(
      ActorSystem<?> system,
      int index) {
    String tag = "aggregateA-slice-" + index;
    SourceProvider<Offset, EventEnvelope<Message>> sourceProvider = EventSourcedProvider
        .eventsByTag(
            system,
            CassandraReadJournal.Identifier(),
            tag
        );
    int saveOffsetAfterEnvelopes = 100;
    Duration saveOffsetAfterDuration = Duration.ofMillis(500);

    return CassandraProjection.atLeastOnce(
        ProjectionId.of("event_projections", tag),
        sourceProvider,
        new AggregateAHandler(system, tag)
    ).withSaveOffset(saveOffsetAfterEnvelopes, saveOffsetAfterDuration);
  }
}

AggregateB Projection

public class AggregateBProjection {
  public static void init(ActorSystem<?> system) {
    if (Cluster.get(system).selfMember().hasRole("read-model")) {
      ClusterShardingSettings shardingSettings = ClusterShardingSettings.create(system);
      ShardedDaemonProcessSettings shardedDaemonProcessSettings =
          ShardedDaemonProcessSettings.create(system)
              .withShardingSettings(shardingSettings.withRole("read-model"));

      // Allows us to run N actors, where N = parallelism
      ShardedDaemonProcess.get(system).init(
          ProjectionBehavior.Command.class,
          "AggregateBProcessors",
          4,
          index -> ProjectionBehavior.create(createProjectionFor(system, index)),
          shardedDaemonProcessSettings,
          Optional.of(ProjectionBehavior.stopMessage())
      );
    }

  static AtLeastOnceCassandraProjection<EventEnvelope<Message>> createProjectionFor(
      ActorSystem<?> system,
      int index) {
    String tag = "aggregateB-slice-" + index;
    SourceProvider<Offset, EventEnvelope<Message>> sourceProvider = EventSourcedProvider
        .eventsByTag(
            system,
            CassandraReadJournal.Identifier(),
            tag
        );
    int saveOffsetAfterEnvelopes = 100;
    Duration saveOffsetAfterDuration = Duration.ofMillis(500);

    return CassandraProjection.atLeastOnce(
        ProjectionId.of("event_projections", tag),
        sourceProvider,
        new AggregateBHandler(system, tag)
    ).withSaveOffset(saveOffsetAfterEnvelopes, saveOffsetAfterDuration);
  }
}

That looks right. Different projections and handlers for the different aggregates. One ShardedDaemonProcess per projection type.

I would probably use different names in the ProjectionId. Now you use “event_projections” for both. They will be unique since the tags are different, but would be more clear and robust to use ProjectionId.of("AggregateAProjection", tag) and ProjectionId.of("AggregateBProjection", tag)

1 Like