Best practice for eventual consistency when creating 2 related Aggregate Roots

Dear all,

Imagine I’m implementing a Customer entity and an Invoice entity.
Both Event Sourcing entities.

What would be the best practice to check when creating an Invoice that the associated Customer exists?

Trying with Reply and Effects, but don’t know how to emit the event …

See this code:

@EventSourcedEntity(entityType = "invoice")
public class InvoiceEntity {

    private String invoiceId;
    private String customerId;

    @CommandHandler
    public void createInvoice(InvoiceAPI.CreateInvoiceCommand createInvoiceCommand, CommandContext ctx) {

        CustomerAPI.GetCustomerResponse getCustomerResponse = CustomerAPI.GetCustomerResponse.getDefaultInstance();

        Reply.forward(ctx.serviceCallFactory().lookup("com.customer.CustomerService", "GetCustomerCommand", CustomerAPI.GetCustomerResponse.class)
                .createCall(getCustomerResponse)).addEffects(
                /* How to invoke the EventHandler? This does not compile:
                        Effect.of(
                        ctx.emit(
                            InvoiceDomain.InvoiceCreated.newBuilder()
                                .setCustomerId(createInvoiceCommand.getCustomerId())
                            .build()))
        */);

    }
    @EventHandler
    public void accountAssociated(@SuppressWarnings("unused")InvoiceDomain.InvoiceCreated invoiceCreated) {
        this.customerId = invoiceCreated.getCustomerId();
    }

}

For completeness and easing verification, let me include the protobuf definitions.

It’s that easy !!! Loving the programming model ;)

customer_api.proto

syntax = "proto3";

package com.customer;

option java_outer_classname = "CustomerAPI";

import "google/protobuf/empty.proto";
import "google/api/annotations.proto";
import "akkaserverless/annotations.proto";

message CreateCustomerCommand {
    string customer_id = 1 [(akkaserverless.field).entity_key = true];
    string name = 2;
}

message GetCustomerCommand {
    string customer_id = 1 [(akkaserverless.field).entity_key = true];
}

message GetCustomerResponse {
    string name = 1;
}

service CustomerService {

    rpc CreateCustomer(CreateCustomerCommand) returns (google.protobuf.Empty) {
        option (google.api.http) = {
            post: "/customers/create"
            body: "*"
        };
    }

    rpc GetAccount(GetCustomerCommand) returns (GetCustomerResponse) {
        option (google.api.http) = {
            get: "/customers/get/{id}"
        };
    }
}

invoice_api.proto:

syntax = "proto3";

package com.invoice;

option java_outer_classname = "InvoiceAPI";

import "google/protobuf/empty.proto";
import "google/api/annotations.proto";
import "akkaserverless/annotations.proto";

message CreateInvoiceCommand {
    string invoice_id = 1 [(akkaserverless.field).entity_key = true];
    string customer_id = 2;
    string name = 3;
}

message GetInvoiceResponse {
    string name = 1;
}

service InvoiceService {

    rpc CreateInvoice(CreateInvoiceCommand) returns (GetInvoiceResponse) {
        option (google.api.http) = {
            post: "/invoices/create"
            body: "*"
        };
    }

}

and invoice_domain.proto:

syntax = "proto3";

package com.customer;

option java_outer_classname = "InvoiceDomain";


import "google/protobuf/empty.proto";
import "google/api/annotations.proto";
import "akkaserverless/annotations.proto";

message InvoiceCreated {
    string invoice_id = 1;
    string customer_id = 2;
    string name = 3;
}

Thanks!

If it is a simple relationship consisting of only the ID of another aggregate root there is likely no need for any consistency checking at the service level. The fact that an invoice may or may not be associated to a customer is unlikely to affect the validity of its persisted state. It’s just a simple value attribute.

Very likely you have a read side projection that aggregates a list of invoices for a customer. An event listener in the customer service listening for an invoice creation event updates the projection, adding an invoice to a customer. If the listener encounters an invoice with a customer ID that does not exist it can take appropriate action. It can perhaps create a customer entity with default values so that the invoice can be exposed. It can otherwise raise an alert or start a business process to resolve the issue.

1 Like

What Phil said.

An Aggregate Root, almost by definition, shouldn’t have to check anything else in order to perform an operation on it’s data.

Aggregate: A cluster of associated objects that are treated as a unit for the purpose of data changes .

You are proposing a tight coupling between customers and invoices. If that’s truly necessary, then they are not aggregate roots: Customer is the aggregate root and Invoice is an Entity under Customer.

But I like Phil’s way better.

1 Like

Dear Phil and David,

Completely agree that the aggregate root is not the place on CQRS to force domain Business rules/logic.

But in this programming model we’re mixing 2 DDD/CQRS concepts in one Java class: the Command Handler (CQRS) and the Domain Entity (DDD). Which is ok and quite common on different CQRS implementations, but we shouldn’t mix concepts despite being implemented on the same class.

Axon [see 1] for example allows to implement Command Handlers on the aggregates or alternatively as singleton classes.
But AFAIK this is not supported in Akka Serverless (not sure also if would use it, sincerely, as current model is enough for at least my current projects).

In CQRS architectures it’s the responsibility of the Command Handler to force those business rules. Other options like introducing Application Layer facades or specific validation objects/DTOs have bee throughly discussed on the CQRS community (like Udi Dahan or Vaughn Vernon. See [2]).

In my example Domain it’s a perfectly valid business Rule that an invoice must be associated when created with an existing customer, so “something” must force this business rule, being the Command Handler, and it will need access to other AR info through Domain Services (those defined in the .proto APIs). If when rehydrating the AR from the events the customer with that ID has been deleted etc I will notify the user or take the action I consider, but want to enforce this business rule (or any other implying 2 Domain Entities) on my domain.

And my problem is that I don’t know how to invoke those Domain Services synchronously, as shown in the example.

As detailed I fully agree it shouldn’t be validated on the Event Handlers (that are an intrinsic part of the AR, not like the Command Handlers, which can be on a different Java class).

So, basically, how can we safely invoke a Domain Service on the Domain Entity / AR and act according with the invocation result, despite being an asynchronous call?

I’m missing something like ServiceCall onComplete/onError or the way Lightbend’s Lagom does it.

Hopefully it’s clearer now.

Thx,

Altair

[1] Command Model - Axon Reference Guide
[2] Domain Model Validation - Kamil Grzybek