Akka Sharded Cluster does not get messages from Kafka Consumer

Hey guys,

I am on a quest to integrate Akka Sharded Cluster Java with Kafka, so I got the killrweather sample application from Lightbend website and modified it using the " Akka Cluster Sharding" doc ( for some reason I can’t paste more than 2 links, but the link is on the issue below) .

My repo with killrweather sample app : https://github.com/marciomarinho/akka-samples-cluster-sharding-kafka-java

The issue Sean responded with detailed explanations : https://github.com/akka/alpakka-kafka/issues/1189

Long story short, I realised the messages were not getting consumed and @seglo helped me to test the alpakka integration, which worked after I applied the modifications suggested by Sean, and was able to see the message arriving:

    public static void initSharding(ActorSystem<?> system) {
//            ClusterSharding.get(system).init(Entity.of(TypeKey, entityContext ->
//              WeatherStation.create(entityContext.getEntityId())
//            ));

        String groupId = "register-trade-topic-group-id";
        EntityTypeKey<User> typeKey = EntityTypeKey.create(User.class, groupId);

        CompletionStage<KafkaClusterSharding.KafkaShardingNoEnvelopeExtractor<User>> messageExtractor =
                KafkaClusterSharding.get(system)
                        .messageExtractorNoEnvelope(
                                REGISTER_TRADE_TOPIC,
                                Duration.ofSeconds(10),
                                (User msg) -> msg.id,
                                ConsumerSettings.create(
                                        Adapter.toClassic(system), new StringDeserializer(), new StringDeserializer())
                                        .withBootstrapServers("localhost:9092")
                                        .withGroupId(
                                                typeKey
                                                        .name()));

        messageExtractor.thenAccept(
                extractor ->
                        ClusterSharding.get(system)
                                .init(
                                        Entity.of(typeKey, ctx -> userBehaviour(ctx.getEntityId()))
//                                        Entity.of(typeKey, ctx -> WeatherStation.create(ctx.getEntityId()))
                                                .withAllocationStrategy(
                                                        new ExternalShardAllocationStrategy(
                                                                system, typeKey.name(), Timeout.create(Duration.ofSeconds(5))))
                                                .withMessageExtractor(extractor)));

        akka.actor.typed.ActorRef<ConsumerRebalanceEvent> rebalanceListener =
                KafkaClusterSharding.get(system).rebalanceListener(typeKey);

        ConsumerSettings<String, byte[]> consumerSettings =
                ConsumerSettings.create(
                        Adapter.toClassic(system), new StringDeserializer(), new ByteArrayDeserializer())
                        .withBootstrapServers("localhost:9092")
                        .withGroupId(
                                typeKey
                                        .name()); // use the same group id as we used in the `EntityTypeKey` for `User`

        // pass the rebalance listener to the topic subscription
        AutoSubscription subscription =
                Subscriptions.topics(REGISTER_TRADE_TOPIC)
                        .withRebalanceListener(Adapter.toClassic(rebalanceListener));

        Consumer.plainSource(consumerSettings, subscription)
                //.via(userBusiness()) // put your business logic, or omit to just try starting the stream
                .map(e -> {
                            String s = new String(e.value());
                            System.out.println(s);
                            return s;
                        }
                )
                .runWith(Sink.ignore(), system);
}

Produces the following Application’s log:

  • ClusterSingletonManager state change [Start -> Younger]

{
“name”: “Molecule Man”,
“age”: 29,
“secretIdentity”: “Dan Jukes”,
“powers”: [
“Radiation resistance”,
“Turning tiny”,
“Radiation blast”
]
}
`
The messages are arriving, but they are not being consumed by the consumers, as shown in the “Aligning Kafka Partitions with Akka Cluster Sharding” on Github.

Btw, that Scala sample does not work either, it does not dispatch the message to the consumers as it should by the docs:

[info] [2020-01-16 09:51:38,672] [INFO] [sample.sharding.kafka.UserEventsKafkaProcessor$] [KafkaToSharding-akka.actor.default-dispatcher-26] [akka://KafkaToSharding/user/kafka-event-processor] - entityId->partition 29->45
[info] [2020-01-16 09:51:38,672] [INFO] [sample.sharding.kafka.UserEventsKafkaProcessor$] [KafkaToSharding-akka.actor.default-dispatcher-26] [akka://KafkaToSharding/user/kafka-event-processor] - Forwarding message for entity 29 to cluster sharding
[info] [2020-01-16 09:51:38,673] [INFO] [sample.sharding.kafka.UserEvents$] [KafkaToSharding-akka.actor.default-dispatcher-26] [akka://KafkaToSharding/system/sharding/user-processing/75/29] - user 29 purchase cat t-shirt, quantity 0, price 8874
[info] [2020-01-16 09:51:39,702] [INFO] [sample.sharding.kafka.UserEventsKafkaProcessor$] [KafkaToSharding-akka.actor.default-dispatcher-17] [akka://KafkaToSharding/user/kafka-event-processor] - entityId->partition 60->111
[info] [2020-01-16 09:51:39,703] [INFO] [sample.sharding.kafka.UserEventsKafkaProcessor$] [KafkaToSharding-akka.actor.default-dispatcher-17] [akka://KafkaToSharding/user/kafka-event-processor] - Forwarding message for entity 60 to cluster sharding
[info] [2020-01-16 09:51:39,706] [INFO] [sample.sharding.kafka.UserEvents$] [KafkaToSharding-akka.actor.default-dispatcher-17] [akka://KafkaToSharding/system/sharding/user-processing/2/60] - user 60 purchase cat t-shirt, quantity 2, price 9375
[info] [2020-01-16 09:51:40,732] [INFO] [sample.sharding.kafka.UserEventsKafkaProcessor$] [KafkaToSharding-akka.actor.default-dispatcher-17] [akka://KafkaToSharding/user/kafka-event-processor] - entityId->partition 75->1
[info] [2020-01-16 09:51:40,732] [INFO] [sample.sharding.kafka.UserEventsKafkaProcessor$] [KafkaToSharding-akka.actor.default-dispatcher-17] [akka://KafkaToSharding/user/kafka-event-processor] - Forwarding message for entity 75 to cluster sharding

So, if there is anyone with this knowledge, help me please!!!

I think I was able to solve the problem !!! :grinning:

The solution was to push the message to the Shard when the consumer receives the message.

Main init sharding method:

    public static void initSharding(ActorSystem<?> system) {
//            ClusterSharding.get(system).init(Entity.of(TypeKey, entityContext ->
//              WeatherStation.create(entityContext.getEntityId())
//            ));

        CompletionStage<KafkaClusterSharding.KafkaShardingNoEnvelopeExtractor<WeatherStation.Command>> messageExtractor =
                KafkaClusterSharding.get(system)
                        .messageExtractorNoEnvelope(
                                REGISTER_TRADE_TOPIC,
                                Duration.ofSeconds(10),
                                (WeatherStation.Command msg) -> msg.getId(),
                                ConsumerSettings.create(
                                        Adapter.toClassic(system), new StringDeserializer(), new StringDeserializer())
                                        .withBootstrapServers("localhost:9092")
                                        .withGroupId(
                                                TypeKey
                                                        .name()));

        messageExtractor.thenAccept(
                extractor ->
                        ClusterSharding.get(system)
                                .init(
                                        Entity.of(TypeKey, ctx -> create(ctx.getEntityId())) //userBehaviour(ctx.getEntityId()))
//                                        Entity.of(typeKey, ctx -> WeatherStation.create(ctx.getEntityId()))
                                                .withAllocationStrategy(
                                                        new ExternalShardAllocationStrategy(
                                                                system, TypeKey.name(), Timeout.create(Duration.ofSeconds(5))))
                                                .withMessageExtractor(extractor)));

        akka.actor.typed.ActorRef<ConsumerRebalanceEvent> rebalanceListener =
                KafkaClusterSharding.get(system).rebalanceListener(TypeKey);

        ConsumerSettings<String, byte[]> consumerSettings =
                ConsumerSettings.create(
                        Adapter.toClassic(system), new StringDeserializer(), new ByteArrayDeserializer())
                        .withBootstrapServers("localhost:9092")
                        .withGroupId(
                                TypeKey
                                        .name()); // use the same group id as we used in the `EntityTypeKey` for `User`

        // pass the rebalance listener to the topic subscription
        AutoSubscription subscription =
                Subscriptions.topics(REGISTER_TRADE_TOPIC)
                        .withRebalanceListener(Adapter.toClassic(rebalanceListener));

        Consumer.plainSource(consumerSettings, subscription)
//                .via(userBusiness(system, 1)) // put your business logic, or omit to just try starting the stream
                .map(e -> {
                            String s = new String(e.value());
                            System.out.println(s);

                            userBusiness(system, r.nextInt(128) + 1, new Data(1234l, null, 33.44d, "aaaa"));

                            return s;
                        }
                )
                .runWith(Sink.ignore(), system);

    }

Pushing data to the shard:

    private static CompletionStage<WeatherStation.DataRecorded> userBusiness(ActorSystem<?> system, long wsid, WeatherStation.Data data) {
        ClusterSharding sharding = ClusterSharding.get(system);
        EntityRef<Command> ref = sharding.entityRefFor(WeatherStation.TypeKey, Long.toString(wsid));
        return ref.ask(replyTo -> new WeatherStation.Record(data, System.currentTimeMillis(), replyTo), Duration.ofSeconds(10));
    }

After that I could see the messages being dispatched and consumed by the consumers:

[2020-08-28 19:28:10,250] [DEBUG] [akka://KillrWeather@127.0.0.1:2553] [sample.killrweather.WeatherStation] [KillrWeather-akka.actor.default-dispatcher-29] [akka://KillrWeather/system/sharding/register-trade-topic-group-id/27/87] - 1 total readings from station 87, type null, average 33.44, diff: processingTime - eventTime: 1598606888861 ms
[2020-08-28 19:28:10,250] [DEBUG] [akka://KillrWeather@127.0.0.1:2555] [sample.killrweather.WeatherStation] [KillrWeather-akka.actor.default-dispatcher-23] [akka://KillrWeather/system/sharding/register-trade-topic-group-id/75/8] - 1 total readings from station 8, type null, average 33.44, diff: processingTime - eventTime: 1598606888861 ms
gewtrwerwerrt
[2020-08-28 19:28:11,656] [DEBUG] [akka://KillrWeather@127.0.0.1:2555] [sample.killrweather.WeatherStation] [KillrWeather-akka.actor.default-dispatcher-23] [akka://KillrWeather/system/sharding/register-trade-topic-group-id/83/88] - 1 total readings from station 88, type null, average 33.44, diff: processingTime - eventTime: 1598606890416 ms
nbdcnbcbnnbcvnbv
[2020-08-28 19:28:12,793] [DEBUG] [akka://KillrWeather@127.0.0.1:2553] [sample.killrweather.WeatherStation] [KillrWeather-akka.actor.default-dispatcher-29] [akka://KillrWeather/system/sharding/register-trade-topic-group-id/25/81] - 1 total readings from station 81, type null, average 33.44, diff: processingTime - eventTime: 1598606891556 ms
tw432545453245
rwerttwre
[2020-08-28 19:28:13,909] [DEBUG] [akka://KillrWeather@127.0.0.1:2553] [sample.killrweather.WeatherStation] [KillrWeather-akka.actor.default-dispatcher-50] [akka://KillrWeather/system/sharding/register-trade-topic-group-id/30/52] - 1 total readings from station 52, type null, average 33.44, diff: processingTime - eventTime: 1598606892667 ms
[2020-08-28 19:28:13,910] [DEBUG] [akka://KillrWeather@127.0.0.1:2555] [sample.killrweather.WeatherStation] [KillrWeather-akka.actor.default-dispatcher-23] [akka://KillrWeather/system/sharding/register-trade-topic-group-id/94/10] - 1 total readings from station 10, type null, average 33.44, diff: processingTime - eventTime: 1598606892667 ms

It is still hardcoded and I need to cleanup and implement things properly, but all dots seems to be connected now!!!