Publishing to the same Kafka Topic from different services

(Marvin) #1

I would like to have different services (not different instances of the same service) publish to identical topics. When I try to do this now, an InstanceAlreadyExistsException is ultimately thrown. This is a known issue but I’m not clear on the solution.

The purpose of these different services is to consume information from different 3rd party APIs and convert it to a standard format.

API 1 -> Microservice 1 -> Generic Format
API 2 -> Microservice 2 -> Generic Format

All of our downstream logic is dependent on the Generic Format. Consumers of the Generic Format don’t care what source API it came from and shouldn’t need to be modified if a new API 3 and Microservice 3 are added.

The most straightforward solution I’m aware of is creating a pass-through service like:

API 1 -> Microservice 1 -> Pass Through Service -> Generic Format
API 2 -> Microservice 2 -> Pass Through Service -> Generic Format

But this solution increases latency by adding an extra hop. It also will require us to update the Pass Through Service whenever a new source API is added.

Is there a way to allow the desired behavior of letting multiple different services publish to the same topic?

(Alan Klikic) #2

Hi @marvin,

So you business requirement is to map APIx to a Generic Format. MicroserviceX is responsible for mapping APIx.
These are potential solutions with different pros and cons:

  1. Topic per API

    Every MicroserviceX has its own topic. Consumer services need to subscribe to each.
    Pros: Anti corruption layer fully complies by DDD
    Cons: Every new API requires updating of all consumer services.

  2. Topic per API + Aggregated Topic

    Ever MicroserviceX has its own topic that is aggregated to one topic. Consumer services are subscribing to one topic.
    Pros: Anti corruption layer fully complies by DDD, consumers do not need to be updated when new API is added.
    Cons: Additional service layer is introduced that needs to be maintained.

    Regarding aggregation (additional service layer) these are potential solutions (using different APIs) with different pros and cons:

    a) using Lagom API

      Lagom Topic is the only Lagom API  solution for publishing and Lagom producer works only with event stream so Entity is required.
      Pros: using strictly Lagom API
      Cons: using Entity is a big overhead in case like this

    b) using Lagom API + Akka stream Kafka

      Lagom API  is used for consuming APIx topics and Akka stream Kafka is used to publish aggregation to on topic (without using event store). You can chain publish acknowledge with consumer offset to get at least once semantic like offered by Lagom API. 
      Pros: no entity overhead
      Cons: can not use strictly Lagom API. But Akka stream kafka is already included as a dependencies (no need to maintain it) and used in Lagom Topic Producer implementation.

    c) using Kafka stream

      Solution suggested by @hythloday :
      Pros: Kafka native aggregation
      Cons: can not use strictly Lagom API. Need to add dependency and maintain it.
  3. One topic

    Every MicroserviceX is publishing to same topic.
    Pros:  Consumers do not need to be updated when new API  is added. No additional layer that needs to be maintained.
    Cons:  Anti corruption layer does not comply to DDD. Maybe even DDD anti pattern (DDD experts to comment on)
    This can be implemented in Lagom in two different ways:
     a) One Lagom service descriptor
          This solution works only if service descriptor defines only topics and no service calls.
          Topic, in opposite of service calls, does not require service locator and therefor multiple services can implement it (this is a hack ;)).
          Additional note: when subscribing to a topic, Lagom generates Kafka groupId based on the service name from service descriptor. Meaning that each service implementing the same descriptor and subscribing to any service topic will share groupId, meaning that all services will be in the same Kafka group, meaning message partitions will be loadbalanced between. This can be solved by specifying withGroupId with the different id per service implemenattion, for every subscribe.
          Pros: one service descriptor to maintain. Consumers subscribe to one service descriptor. 
          Cons: Generally a hack. Can not have service calls.
     b) Lagom service descriptor per Microservice X with shared topicId
         MicroserviceX has its own service descriptor but topicId is shared.
         To enable consumer services to subscribe to one topic these are additional steps that need to be done. These are potential solutions:
             1) "centralized" service descriptor with only topic definition. This service descriptor is not implemented by any service and only used for consuming.
             2) choosing one of the Microservice X  descriptors  to subscribe to
         Pros: Not a big hack from Lagom perspective ;). Can use service calls.
         Cons: Consumer service subscribing to one topic is very nasty in both provided solutions and not easy to maintain.

#4a is something that you are trying now and getting InstanceAlreadyExistsException
I have already implemented this solution with no problems but did not test this using runAll. So it should be something related to running it on one JVM.
This needs to be checked by Lagom team. Aslo take in mind that this solution is not something Lagom was designed for so… ;)

I was curious about it and was checking the reason for InstanceAlreadyExistsException and found this:

When a Kafka Producer is created, it attempts to register an MBean using the as its unique identifier.

There are two possibilities of why you are getting the InstanceAlreadyExistsException warning:
1) You are attempting to initialize more than one Producer at a time with the same property on the same JVM.
2) You are not calling close() on an existing Producer before initializing another Producer. Calling close() unregisters the MBean.

#1 sounds like a potential cause of the problem.
Client id in Lagom, if I’m correct, is generated based on topic id and event shard id (similar as eventprocessorid in offsetstore table).
So each service implementing the same service descriptor will share
Running this services on the same JVM, could then, based on the mentioned issue, be the cause of the problem.

I hope this helps. If you need more details on specific topic do not hesitate to ask.

(Marvin) #3

@aklikic thanks, this is extremely helpful. We’re going to start off with solution 1 and stick to best practices for now. Its nice to have the other options clearly laid out.

About InstanceAlreadyExistsException:

  • In the failing code, I was using inheritance for the microservices that were implemented and this compounded the problem. Meaning that a BaseMicroservice API and implementation were defined. Then MicroserviceX and MicroserviceY extended / implemented different parts of BaseMicroservice. MicroserviceX and MicroserviceY both published Generic Format.
  • The part of this design that was leading to the exception was that the sharding tag was defined in the inherited Event class with something like AggregateEventShards<BookEvent> TAG = AggregateEventTag.sharded(CommonBookEvent.class, NUM_SHARDS);. So the extending services had different service names but had the same topic and sharding namespace.
  • Based on this, I came to to the same conclusion about client ID, but I wasn’t able to confirm it. I added to application.conf but that didn’t kill the exception.
  • Ultimately, what worked was to make sure that Microservice X & Y defined a sharding tag in a unique namespace.