So you business requirement is to map APIx to a Generic Format. MicroserviceX is responsible for mapping APIx.
These are potential solutions with different pros and cons:
Topic per API
Every MicroserviceX has its own topic. Consumer services need to subscribe to each.
Pros: Anti corruption layer fully complies by DDD
Cons: Every new API requires updating of all consumer services.
Topic per API + Aggregated Topic
Ever MicroserviceX has its own topic that is aggregated to one topic. Consumer services are subscribing to one topic.
Pros: Anti corruption layer fully complies by DDD, consumers do not need to be updated when new API is added.
Cons: Additional service layer is introduced that needs to be maintained.
Regarding aggregation (additional service layer) these are potential solutions (using different APIs) with different pros and cons:
a) using Lagom API
Lagom Topic is the only Lagom API solution for publishing and Lagom producer works only with event stream so Entity is required.
Pros: using strictly Lagom API
Cons: using Entity is a big overhead in case like this
b) using Lagom API + Akka stream Kafka
Lagom API is used for consuming APIx topics and Akka stream Kafka is used to publish aggregation to on topic (without using event store). You can chain publish acknowledge with consumer offset to get at least once semantic like offered by Lagom API.
Pros: no entity overhead
Cons: can not use strictly Lagom API. But Akka stream kafka is already included as a dependencies (no need to maintain it) and used in Lagom Topic Producer implementation.
c) using Kafka stream
Solution suggested by @hythloday :
Pros: Kafka native aggregation
Cons: can not use strictly Lagom API. Need to add dependency and maintain it.
Every MicroserviceX is publishing to same topic.
Pros: Consumers do not need to be updated when new API is added. No additional layer that needs to be maintained.
Cons: Anti corruption layer does not comply to DDD. Maybe even DDD anti pattern (DDD experts to comment on)
This can be implemented in Lagom in two different ways:
a) One Lagom service descriptor
This solution works only if service descriptor defines only topics and no service calls.
Topic, in opposite of service calls, does not require service locator and therefor multiple services can implement it (this is a hack ;)).
Additional note: when subscribing to a topic, Lagom generates Kafka groupId based on the service name from service descriptor. Meaning that each service implementing the same descriptor and subscribing to any service topic will share groupId, meaning that all services will be in the same Kafka group, meaning message partitions will be loadbalanced between. This can be solved by specifying withGroupId with the different id per service implemenattion, for every subscribe.
Pros: one service descriptor to maintain. Consumers subscribe to one service descriptor.
Cons: Generally a hack. Can not have service calls.
b) Lagom service descriptor per Microservice X with shared topicId
MicroserviceX has its own service descriptor but topicId is shared.
To enable consumer services to subscribe to one topic these are additional steps that need to be done. These are potential solutions:
1) "centralized" service descriptor with only topic definition. This service descriptor is not implemented by any service and only used for consuming.
2) choosing one of the Microservice X descriptors to subscribe to
Pros: Not a big hack from Lagom perspective ;). Can use service calls.
Cons: Consumer service subscribing to one topic is very nasty in both provided solutions and not easy to maintain.
#4a is something that you are trying now and getting InstanceAlreadyExistsException
I have already implemented this solution with no problems but did not test this using runAll. So it should be something related to running it on one JVM.
This needs to be checked by Lagom team. Aslo take in mind that this solution is not something Lagom was designed for so… ;)
I was curious about it and was checking the reason for InstanceAlreadyExistsException and found this:
When a Kafka Producer is created, it attempts to register an MBean using the client.id as its unique identifier.
There are two possibilities of why you are getting the InstanceAlreadyExistsException warning:
1) You are attempting to initialize more than one Producer at a time with the same client.id property on the same JVM.
2) You are not calling close() on an existing Producer before initializing another Producer. Calling close() unregisters the MBean.
#1 sounds like a potential cause of the problem.
Client id in Lagom, if I’m correct, is generated based on topic id and event shard id (similar as eventprocessorid in offsetstore table).
So each service implementing the same service descriptor will share client.id.
Running this services on the same JVM, could then, based on the mentioned issue, be the cause of the problem.
I hope this helps. If you need more details on specific topic do not hesitate to ask.