Akka Cassandra projection not working when used along with kafka sharded cluster

Hey guys,

I have a project which utilises Akka Sharded Cluster with Kafka, with persistent Actors on Cassandra, which works fine. This application’s goal is to consume messages from kafka then dispatch to the sharded actors for processing, save the state for resilience, perform the business process, then use Akka projections for delivery guarantee on another Kafka topic for downstream processing.

Here is my Kafka Sharded Cluster setup :

public TradeKafkaProcessorService(final ActorSystem<?> system, final String kafkaBootstrap) {

        sharding = ClusterSharding.get(system);
        objectMapper = JacksonObjectMapperProvider.get(Adapter.toClassic(system))
                .getOrCreate("jackson-json", Optional.empty());

        final String securityProtocol = ConfigFactory.load().getString(SECURITY_PROTOCOL_KEY);
        final String sslProtocol = ConfigFactory.load().getString(SSL_PROTOCOL_KEY);

        CompletionStage<KafkaClusterSharding.KafkaShardingNoEnvelopeExtractor<Trade.Command>> messageExtractor =
                KafkaClusterSharding.get(system)
                        .messageExtractorNoEnvelope(
                                REGISTER_TRADE_TOPIC,
                                Duration.ofSeconds(10),
                                (Trade.Command msg) -> msg.toString(),
                                ConsumerSettings.create(
                                        Adapter.toClassic(system), new StringDeserializer(), new StringDeserializer())
                                        .withBootstrapServers(kafkaBootstrap)
                                        .withProperty(SECURITY_PROTOCOL, securityProtocol)
                                        .withProperty(SSL_PROTOCOL, sslProtocol)
                                        .withGroupId(
                                                ENTITY_TYPE_KEY
                                                        .name()));

        messageExtractor.thenAccept(
                extractor ->
                        ClusterSharding.get(system)
                                .init(
                                        Entity.of(
                                                ENTITY_TYPE_KEY,
                                                entityContext ->
                                                        Trade.create(
                                                                entityContext.getEntityId(),
                                                                PersistenceId.of(
                                                                        entityContext.getEntityTypeKey().name(), entityContext.getEntityId())))
                                                .withAllocationStrategy(
                                                        new ExternalShardAllocationStrategy(
                                                                system, ENTITY_TYPE_KEY.name(), Timeout.create(Duration.ofSeconds(5))))
                                                .withMessageExtractor(extractor)));

        ActorRef<ConsumerRebalanceEvent> rebalanceListener =
                KafkaClusterSharding.get(system).rebalanceListener(ENTITY_TYPE_KEY);

        ConsumerSettings<String, byte[]> consumerSettings =
                ConsumerSettings.create(
                        Adapter.toClassic(system), new StringDeserializer(), new ByteArrayDeserializer())
                        .withBootstrapServers(kafkaBootstrap)
                        .withGroupId(ENTITY_TYPE_KEY.name());

        // pass the rebalance listener to the topic subscription
        AutoSubscription subscription =
                Subscriptions.topics(REGISTER_TRADE_TOPIC)
                        .withRebalanceListener(Adapter.toClassic(rebalanceListener));

        Consumer.plainSource(consumerSettings, subscription)
                .map(e -> {

                            final String key = e.key();
                            final String value = new String(e.value());

                            TradeCaptureReport TradeCaptureReport = objectMapper.readValue(value, TradeCaptureReport.class);
                            handleMessage(key, TradeCaptureReport);

                            return TradeCaptureReport;

                        }
                )
                .runWith(Sink.ignore(), system);


    }

The application works fine, as seen by the logs :

27.0.0.1:2553/system/cluster/core/daemon#-1697575068]] to [akka://Trade@127.0.0.1:53617]
[2020-09-16 11:50:44,907] [INFO] [akka://Trade@127.0.0.1:2553] [akka.cluster.Cluster] [Trade-akka.actor.default-dispatcher-36] [Cluster(akka://Trade)] - Cluster Node [akka://Trade@127.0.0.1:2553] - Node [akka://Trade@127.0.0.1:53617] is JOINING, roles [dc-default]
[2020-09-16 11:50:44,909] [INFO] [akka://Trade@127.0.0.1:53617] [akka.cluster.Cluster] [Trade-akka.actor.default-dispatcher-3] [Cluster(akka://Trade)] - Cluster Node [akka://Trade@127.0.0.1:53617] - Welcome from [akka://Trade@127.0.0.1:2553]
[2020-09-16 11:50:46,176] [INFO] [akka://Trade@127.0.0.1:2553] [akka.cluster.Cluster] [Trade-akka.actor.default-dispatcher-34] [Cluster(akka://Trade)] - Cluster Node [akka://Trade@127.0.0.1:2553] - Leader is moving node [akka://Trade@127.0.0.1:2554] to [Up]
[2020-09-16 11:50:46,177] [INFO] [akka://Trade@127.0.0.1:2553] [akka.cluster.Cluster] [Trade-akka.actor.default-dispatcher-34] [Cluster(akka://Trade)] - Cluster Node [akka://Trade@127.0.0.1:2553] - Leader is moving node [akka://Trade@127.0.0.1:53617] to [Up]
[2020-09-16 11:50:46,690] [INFO] [akka://Trade@127.0.0.1:2554] [akka.cluster.singleton.ClusterSingletonManager] [Trade-akka.actor.default-dispatcher-16] [akka://Trade@127.0.0.1:2554/system/sharding/register-trade-topic-group-idCoordinator] - ClusterSingletonManager state change [Start -> Younger]
[2020-09-16 11:50:46,809] [INFO] [akka://Trade@127.0.0.1:53617] [akka.cluster.singleton.ClusterSingletonManager] [Trade-akka.actor.default-dispatcher-3] [akka://Trade@127.0.0.1:53617/system/sharding/register-trade-topic-group-idCoordinator] - ClusterSingletonManager state change [Start -> Younger]
[2020-09-16 11:51:08,785] [INFO] [akka://Trade@127.0.0.1:2554] [systems.clearpay.trade.Trade] [Trade-akka.actor.default-dispatcher-56] [akka://Trade/system/sharding/register-trade-topic-group-id/0/1] - Starting Trade 1
[2020-09-16 11:51:08,826] [DEBUG] [akka://Trade@127.0.0.1:2554] [systems.clearpay.trade.Trade] [Trade-akka.actor.default-dispatcher-56] [akka://Trade/system/sharding/register-trade-topic-group-id/0/1] - Initializing snapshot recovery: Recovery(SnapshotSelectionCriteria(9223372036854775807,9223372036854775807,0,0),9223372036854775807,9223372036854775807)
[2020-09-16 11:51:08,848] [DEBUG] [akka://Trade@127.0.0.1:2554] [systems.clearpay.trade.Trade] [Trade-akka.actor.default-dispatcher-56] [akka://Trade/system/sharding/register-trade-topic-group-id/0/1] - Snapshot recovered from 0 Map() VersionVector()
[2020-09-16 11:51:08,853] [DEBUG] [akka://Trade@127.0.0.1:2554] [systems.clearpay.trade.Trade] [Trade-akka.actor.default-dispatcher-56] [akka://Trade/system/sharding/register-trade-topic-group-id/0/1] - Replaying events: from: 1, to: 9223372036854775807
[2020-09-16 11:51:09,066] [DEBUG] [akka://Trade@127.0.0.1:2554] [systems.clearpay.trade.Trade] [Trade-akka.actor.default-dispatcher-56] [akka://Trade/system/sharding/register-trade-topic-group-id/0/1] - Recovery successful, recovered until sequenceNr: [9]
[2020-09-16 11:51:09,066] [DEBUG] [akka://Trade@127.0.0.1:2554] [systems.clearpay.trade.Trade] [Trade-akka.actor.default-dispatcher-56] [akka://Trade/system/sharding/register-trade-topic-group-id/0/1] - Returning recovery permit, reason: replay completed successfully
[2020-09-16 11:51:09,068] [DEBUG] [akka://Trade@127.0.0.1:2554] [systems.clearpay.trade.Trade] [Trade-akka.actor.default-dispatcher-56] [akka://Trade/system/sharding/register-trade-topic-group-id/0/1] - Recovery for persistenceId [PersistenceId(register-trade-topic-group-id|1)] took 217.4 ms
[2020-09-16 11:51:09,092] [INFO] [akka://Trade@127.0.0.1:2554] [systems.clearpay.trade.Trade] [Trade-akka.actor.default-dispatcher-56] [akka://Trade/system/sharding/register-trade-topic-group-id/0/1] - State {trades=9, latest state=REGISTERED}
[2020-09-16 11:51:09,097] [DEBUG] [akka://Trade@127.0.0.1:2554] [systems.clearpay.trade.Trade] [Trade-akka.actor.default-dispatcher-56] [akka://Trade/system/sharding/register-trade-topic-group-id/0/1] - Handled command [systems.clearpay.trade.Trade$Register], resulting effect: [Persist(systems.clearpay.trade.Trade$Registered@5d97b5c5)], side effects: [1]
[2020-09-16 11:51:09,123] [DEBUG] [akka://Trade@127.0.0.1:2554] [systems.clearpay.trade.Trade] [Trade-akka.actor.default-dispatcher-16] [akka://Trade/system/sharding/register-trade-topic-group-id/0/1] - Received Journal response: WriteMessagesSuccessful after: 22308415 nanos
[2020-09-16 11:51:09,126] [DEBUG] [akka://Trade@127.0.0.1:2554] [systems.clearpay.trade.Trade] [Trade-akka.actor.default-dispatcher-56] [akka://Trade/system/sharding/register-trade-topic-group-id/0/1] - Received Journal response: WriteMessageSuccess(PersistentRepr(register-trade-topic-group-id|1,10,ee21dc38-2229-4ce4-97f7-ece6635d615c,0,None),1) after: 24871053 nanos

But, the projection does not work at all. In fact, I get no errors, but it does not do what it was supposed to be doing. Which is save the projection on Cassandra and then execute the publisher which is a Kafka producer. Here is my setup :

public static Behavior<Void> create(final String kafkaBootstrap, final Mode mode) {
        return Behaviors.setup(context -> {

            ActorSystem<Void> system = context.getSystem();

            //---> SEND TO KAFKA PROJECTION
            ProducerSettings<String, String> producerSettings =
                    ProducerSettings.create(context.getSystem(), new StringSerializer(), new StringSerializer())
                            .withBootstrapServers(kafkaBootstrap);
            // FIXME classicSystem might not be needed in later Alpakka Kafka version?
            SendProducer<String, String> sendProducer =
                    new SendProducer<>(producerSettings, system.classicSystem());


            // #sendToKafkaProjection
            SourceProvider<Offset, EventEnvelope<WordEnvelope>> sourceProvider =
                    EventSourcedProvider.eventsByTag(system, CassandraReadJournal.Identifier(), "trades-1");

            // #atLeastOnce
            Projection<EventEnvelope<WordEnvelope>> projection =
                    CassandraProjection.atLeastOnce(
                            ProjectionId.of("all-trades", "trades-1"),
                            sourceProvider,
                            () -> new WordPublisher(EVENT_SOURCING_TOPIC, sendProducer));

            context.spawn(ProjectionBehavior.create(projection), projection.projectionId().id());

            new TradeKafkaProcessorService(system, kafkaBootstrap);
            ClusterBootstrap.get(Adapter.toClassic(system)).start();

            if (mode.equals(Mode.EKS)) {
                AkkaManagement.get(Adapter.toClassic(system)).start();
            }

            return Behaviors.empty();
        });
    }

I followed these docs :



Also the demo project :

Any ideas on this folks?

Hey folks,

I finally got it working. I have 2 versions, one inside the same Kafka sharded project, and another one in a separate project. I have to do a few adjustments and I will post the solution here when I’m done, maybe I can help somebody else :slight_smile:

Cheers,
MM

2 Likes