Creating the Cassandra Table and Registering Codecs without relying on the ReadSide Db Handler

Hello all,

In Lagom, the default way of creating cassandra tables and also registering the Cassandra to Java codes is by relying on the Event Processor’s “buildHandler()” command such as this:

  override def buildHandler(): ReadSideHandler[CustomerEvt] = {
    readSide.builder[CustomerEvt](READ_SIDE_ID)
      .setGlobalPrepare(() => CustomerDbEventProcessor.this.createTables())
      .setPrepare(_ => prepareStatements())
      .build()
  }

Where your database table is guarunteed to be greated at the “setGlobalPrepare” and also your Cassandra Codes are gauranteed that they will be registered by the “setPrepare” block.

However, I was wondering if I could do these without using the ReadSideHandler. Because the downside of this approach is that you can only achieve these guarantees if your data is sourced from the PersistentEntity. i.e. if your data is sourced from the Kafka messages directly, you cannot use this.

I am wondering if there is a way to make sure that 1) tables are created 2) cassandra codes are registered without going through the ReadSideProcessor. Perhaps, adding some codes in the ApplicationLoader block might make it work…?

Thanks,

@lejoow
In event processor:

  1. globalPrepare provided function is executed using cluster singleton actor with backoff
    • cluster singleton ensures that this is executed one time in cluster
    • backoff ensures that in case of failure execution is repeated
  2. prepare provided function is executed using actor with backoff
    • executer per instance
    • backoff ensures that in case of failure execution is repeated

If you need this kind of execution, but do not want to use event processor, you need to implement it yourself.
So

  1. implement actor that will take the function, execute it on preStart. Failed execution should trigger actor termination
  2. in Impl class constructor
    a) in case of globalPrepare create actor from #1 with provided execution function and wrap it in cluster singleton with backoff
    b) in case of prepare create actor from #1 with defined execution function and wrap with backoff

You could do it also without actors but in case of failure you would not have a mechanism for repeat.

I can provide some examples if reauired.

Hope this helps.

Thanks a lot for the reply @aklikic!

I understand that you have to implement your own Singleton Actor. If you could provide some examples, that would be really helpful.

Appreciate your help in advance!

@lejoow will share the code tomorrow. Did not have time today.

This is not with functional interface but it will do.

public interface PrepareAction {
	public CompletionStage<Done> prepare();
}

public class PrepareRegisterActor extends AbstractActor{
	private final LoggingAdapter logger = Logging.getLogger(getContext().system(), this);
	private final List<PrepareAction> actions;
	public static Props props(PrepareAction...actions) {
	        return Props.create(PrepareRegisterActor.class,Arrays.asList(actions));
	}

	public PrepareRegisterActor(List<PrepareAction> actions){
		this.actions=actions;
		
	}
	 @Override
	public void preStart() {
		 logger.info("Started...");
	     pipe(doAll(actions.stream().map(PrepareAction::prepare).collect(Collectors.toList()))).to(getSelf());
	}
	 
	 @Override
	public Receive createReceive() {
		 return receiveBuilder()
				.match(Done.class, message -> handleDone())
				.match(Status.Failure.class, this::handleFailure)
				.build();
				
	}
	 
	 private void handleDone() {
		 logger.info("Done");
	 }
	 private void handleFailure(Status.Failure failure) throws RuntimeException{
		 Throwable error = failure.cause();
		 //logger.error("Exception occued. Will retry: {}",error);
		 throw new RuntimeException(error);
	 }

	private <T> PipeToSupport.PipeableCompletionStage<T> pipe(CompletionStage<T> completionStage) {
		return PatternsCS.pipe(completionStage, getContext().dispatcher());
	}

	@Override
	public void postStop() {
		logger.info("Stopped");
	}
	
	public static <T> CompletionStage<Done> doAll(CompletionStage<T>... stages) {
		return doAll(Arrays.asList(stages));
	}

	public static <T> CompletionStage<Done> doAll(List<CompletionStage<T>> stages) {
		CompletionStage<Done> result = CompletableFuture.completedFuture(Done.getInstance());
		for (CompletionStage<?> stage : stages) {
			result = result.thenCombine(stage, (d1, d2) -> Done.getInstance());
		}
		return result;
	}
	
}

public class PrepareRegister {
	
	public static void global(ActorSystem system,String prepareActorName,FiniteDuration backOffMin, FiniteDuration backOffMax, double backOffRandomFactor, PrepareAction...actions) {
		apply(system, true, prepareActorName, backOffMin, backOffMax, backOffRandomFactor, actions);
	}
	
	public static void local(ActorSystem system,String prepareActorName,FiniteDuration backOffMin, FiniteDuration backOffMax, double backOffRandomFactor, PrepareAction...actions) {
		apply(system, false, prepareActorName, backOffMin, backOffMax, backOffRandomFactor, actions);
	}

	private static void apply(ActorSystem system,boolean singletone, String prepareActorName,FiniteDuration backOffMin, FiniteDuration backOffMax, double backOffRandomFactor, PrepareAction...actions) {
		
        Props prepareActor = PrepareRegisterActor.props(actions);
        Props backoffProps = BackoffSupervisor.props(
                Backoff.onFailure(
                		prepareActor,
                		prepareActorName,
                		backOffMin, 
                		backOffMax,
                		backOffRandomFactor
                )
        );
        if(singletone){
	        ClusterSingletonManagerSettings settings = ClusterSingletonManagerSettings.create(system);
	        system.actorOf(
	                ClusterSingletonManager.props(backoffProps, PoisonPill.getInstance(), settings),
	                prepareActorName+"-Singletone"
	        		);
        }else{
        	system.actorOf(backoffProps,prepareActorName+"-BackOff");
        }
	}
	
	

}
//simple test (global is done in the same way)
public class TestPrepareAction implements PrepareAction{
	
	private Integer id;
	private boolean fail;
	
	public TestPrepareAction(Integer id, boolean fail) {
		super();
		this.id=id;
		this.fail = fail;
	}
	
	public void setFail(boolean fail) {
		this.fail=fail;
	}

	@Override
	public CompletionStage<Done> prepare() {
		if(!fail) {
			return CompletableFuture.completedFuture(Done.getInstance());
		}else {
			CompletableFuture<Done> cf=new CompletableFuture<Done>();
			cf.completeExceptionally(new Exception("("+id+")Error"));
			return cf;
		}
	}

}

public class Test {
	
	public static void main(String[] args) {
		final ActorSystem system=ActorSystem.create();
		TestPrepareAction localFail=new TestPrepareAction(2, true);
		PrepareRegister.local(system, 
							   "localPrepare", 
							   FiniteDuration.create(1, TimeUnit.SECONDS), 
							   FiniteDuration.create(10, TimeUnit.SECONDS), 
							   0.2d, new TestPrepareAction(1,false),localFail);
		
		try {
			Thread.sleep(3000);
		} catch (InterruptedException e) {
		}
		System.out.println("Setting #2 to not fail");
		localFail.setFail(false);
	}

}
1 Like

Appreciate it! Thanks a lot