Suppose I want to split events of some persistent entity (e.g. FooEntity
) to produce messages to different topics. Some events should go to topic A
and others should go to B
. Also consumers of both of these topics may be “interested” in the same event but the message format may differ. Is it legal to do it such a way in the service implementation?
@Override
public Topic<A> topicA() {
return TopicProducer.taggedStreamWithOffset(FooEntity.SHARDED_TAG.allTags(),
(tag, offset) -> registry
.eventStream(tag, offset)
.filter(this::filterEventsOnlyForTopicA) // may include events for B
.mapAsync(1, pair -> completedFuture(Pair.create(convertToA(pair.first()), pair.second()))));
}
@Override
public Topic<B> topicB() {
return TopicProducer.taggedStreamWithOffset(FooEntity.SHARDED_TAG.allTags(),
(tag, offset) -> registry
.eventStream(tag, offset)
.filter(this::filterEventsOnlyForTopicB) // may include events for A
.mapAsync(1, pair -> completedFuture(Pair.create(convertToB(pair.first()), pair.second()))));
}