Lagom service consuming input from Kafka

I am trying to figure out how Lagom can be used to consume data from external systems communicating over Kafka.

I’ve ran into this section of Lagom documentation, which describes how Lagom service can communicate with another Lagom service by subscribing to its topic.

helloService
  .greetingsTopic()
  .subscribe // <-- you get back a Subscriber instance
  .atLeastOnce(
  Flow.fromFunction(doSomethingWithTheMessage)
)

However, what is the appropriate configuration when you want to subscribe to a Kafka topic that contains events produced by some random, external system?

Is some sort of adapter needed for this functionality?
To clarify, I have this at the moment:

object Aggregator {
  val TOPIC_NAME = "my-aggregation"
}

trait Aggregator extends Service {
  def aggregate(correlationId: String): ServiceCall[Data, Done]

  def aggregationTopic(): Topic[DataRecorded]

  override final def descriptor: Descriptor = {
    import Service._

    named("aggregator")
      .withCalls(
        pathCall("/api/aggregate/:correlationId", aggregate _)
      )
      .withTopics(
        topic(Aggregator.TOPIC_NAME, aggregationTopic())
          .addProperty(
            KafkaProperties.partitionKeyStrategy,
            PartitionKeyStrategy[DataRecorded](_.sessionData.correlationId)
          )
      )
      .withAutoAcl(true)
  }
}

And I can invoke it via simple POST request.
However, I would like for it to be invoked by consuming Data messages from some (external) Kafka topic.

I was wondering if there is such a way to configure the descriptor in a fashion similar to this mockup:

override final def descriptor: Descriptor = {
  ...
  kafkaTopic("my-input-topic")
    .subscribe(serviceCall(aggregate _)
    .withAtMostOnceDelivery
}

I’ve ran into this discussion on Google Groups, but in the OPs questions, I do not see he is actually doing anything with EventMessages coming from some-topic except routing them to the topic defined by his service.

EDIT #1: Progress update

Looking at the documentation, I decided to try the following approach.
I added 2 more modules, aggregator-kafka-proxy-api and aggregator-kafka-proxy-impl.

In new api module, I defined a new service, with no methods, but one topic which would represent my Kafka topic:

object DataKafkaPublisher {
  val TOPIC_NAME = "data-in"
}

trait DataKafkaPublisher extends Service {
  def dataInTopic: Topic[DataPublished]

  override final def descriptor: Descriptor = {
    import Service._
    import DataKafkaPublisher._

    named("data-kafka-in")
      .withTopics(
        topic(TOPIC_NAME, dataInTopic)
          .addProperty(
            KafkaProperties.partitionKeyStrategy,
            PartitionKeyStrategy[SessionDataPublished](_.data.correlationId)
          )
      )
      .withAutoAcl(true)
  }
}

In the impl module, I simply did the standard implementation

class DataKafkaPublisherImpl(persistentEntityRegistry: PersistentEntityRegistry) extends DataKafkaPublisher {
  override def dataInTopic: Topic[api.DataPublished] =
    TopicProducer.singleStreamWithOffset {
      fromOffset =>
        persistentEntityRegistry.eventStream(KafkaDataEvent.Tag, fromOffset)
          .map(ev => (convertEvent(ev), ev.offset))
    }

  private def convertEvent(evt: EventStreamElement[KafkaDataEvent]): api.DataPublished = {
    evt.event match {
      case DataPublished(data) => api.DataPublished(data)
    }
  }
}

Now, to actually consume these events, in my aggregator-impl module, I added a “subscriber” service, which takes these events, and invokes appropriate commands on entity.

class DataKafkaSubscriber(persistentEntityRegistry: PersistentEntityRegistry, kafkaPublisher: DataKafkaPublisher) {

  kafkaPublisher.dataInTopic.subscribe.atLeastOnce(
    Flow[DataPublished].mapAsync(1) { sd =>
      sessionRef(sd.data.correlationId).ask(RecordData(sd.data))
    }
  )

  private def sessionRef(correlationId: String) =
    persistentEntityRegistry.refFor[Entity](correlationId)
}

This effectively allowed me to publish a message on Kafka topic “data-in”, which was then proxied and converted to RecordData command before issued to the entity to consume.

However, it seems somewhat hacky to me. I am coupled to Kafka by Lagom internals. I cannot swap the source of my data easily. For example, how would I consume external messages from RabbitMQ if I wanted to?
What if I’m trying to consume from another Kafka (different one than used by Lagom)?

Edit #2: More docs

I’ve found a few articles on Lagom docs, notably, this:

Consuming Topics from 3rd parties

You may want your Lagom service to consume data produced on services
not implemented in Lagom. In that case, as described in the Service
Clients section, you can create a third-party-service-api module in
your Lagom project. That module will contain a Service Descriptor
declaring the topic you will consume from. Once you have your
ThirdPartyService interface and related classes implemented, you
should add third-party-service-api as a dependency on your
fancy-service-impl. Finally, you can consume from the topic described
in ThirdPartyService as documented in the Subscribe to a topic
section.

I also have this open on StackOverflow.

You have two options for consuming topics from 3dr parties:

  1. Create Service Descriptor and declaring the topic how described in the documentation.

I’ve found a few articles on Lagom docs

Yep :wink:

  1. Use Alpakka Kafka (underlying Lagom) directly.

I’ve used approach #1 and It seems to work.
But it a pretty brittle solution, since it coupled to Kafka and Kafka alone as an input, and on top of that, that specific Kafka broker. What if I wanted to consume input from some 3rd party Kafka then?

Do you have an idea, or a small code example for approach #2; how I could use Akka-Streams with Alpakka to achieve a more robust, modular architecture?

I know how to use them on their own, but I do not know how to integrate them into Lagom, its lifecycle, common/best practices etc.

Lagom don’t impose a new restrictions.
All code from this docs you can use in Lagom service.

Do you have an idea

My vision:

  1. Add settings in application.conf
  2. Create singleton MyService and inject Typesafe Config.
  3. Create a consumer in MyService by using injected configuration.
  4. Implement any logic for processing got messages.

or a small code example

Sorry, no :disappointed:
Maybe someone else will answer and shared the source code for this case. :wink:

Hi Ivan,

If I understand you request correctly you have a external running kafka cluster and you would like to subscribe to its topic.

Lagom Broker API can only work with one kafka cluster. Meaning that when declaring publish topics in your descriptor or doing subscribe, in one service, you can only configure one kafka cluster.

Lagom Broker API uses service locator to connect to locate kafka cluster endpoints. It uses kafka_native service name to locate it.

So to subscribe to external topic you need to configure kafka_native to point to desired kafka cluster.
Configuration depends on your deployment:

  1. dev
  1. static configuration
    doc: Using static values for services and Cassandra
lagom.services {
  kafka_native = "tcp://10.1.2.3:9092"
}
  1. kubernetes
    for kafka:
apiVersion: v1
kind: Service
metadata:
  name: kafka
  namespace: msrp
spec:
  ports:
  - name: "broker"
    protocol: "TCP"
    port: 9092
    targetPort: 9092
    nodePort: 0

---
apiVersion: v1
kind: Endpoints
metadata:
 name: kafka
 namespace: msrp
subsets:
 - addresses:
     - ip: 10.0.1.85
     - ip: 10.0.2.57
     - ip: 10.0.3.106
   ports:
     - name: "broker"
       port: 9092

DNS SRV will be:
_broker._tcp.kafka.msrp.svc.cluster.local
check this for more details.

The only drawback, when using Lagom Broker API is, like I mentioned before, use of one Kafka cluster in one service.
There are cases where you want to use your own Kafka cluster internally and just subscribe to external one. Currently this is not possible. If you only use one kafka cluster then you are good to go.

I believe, in the future, by the help of community, configuring different service location name per different service api for subscribing will be possible.

All of this is related to out-of-the-box functionality offered by Lagom Broker API.
As @ihostage suggested you can overcome this drawback by using Alpakka Kafka for implement your own subscriber where you are flexible to configure access to external Kafka cluster on your own and still leverage Lagom Broker API for your internal Kafka cluster use.

Hope this helps.

BR,
Alan

1 Like

Thank you for your replies.

For approach #2, let me see if I understood you correctly.

  1. Create a service API which will abstract over akka-streams source, and offer a stream, that is, source, handle of type Source[T, Consumer.Control]
  2. In service implementation of that API actually do the wiring to another Kafka cluster
  3. Inject API into my business logic service, and subscribe to the source
  4. Do everything as I would do if I wasn’t using it in Lagom

Is there a good way to handle offset committing, without leaking Kafka bits and pieces into business code?

Ivan,

Are you only using this external kafka cluster in your service?

If you are only using external Kafka cluster in your business service then you can implement this using only Lagom Broker API.
So you need to:

  1. create API with service descriptor with only topic definition (this API is not beeing implemented)
  2. in your business service configure kafka_native depending on your deployment (as i mentioned in previous post)
  3. in your business service inject service from API created in #1 and subscribe to it using Lagom Broker API subscriber

Offset commiting, in Lagom Broker API subscriber is handled out-of-the-box.

1 Like

Yes, right now that is the case, and I have done exactly that prior to opening this thread, and it is working as expected.

However, as nothing is ever set in stone, I would like to explore a few alternatives in case a requirement for integration with another service/broker emerges.
One such requirement may be to simply connect to external Kafka cluster.
One might be to connect, for example, to RabbitMQ.

I know option #1 works, as I have tried it already. But l want to be ready for possibilities where option #1 is NOT an option (pun intended xD).

Sorry it seems i missed your comment that you have already implemented it :slightly_smiling_face:

So you are interested in how to use/integration Alpakka kafka or Alpakka AMQP consumers in Lagom.
Kafka and AMQP consumer implementations require persistant akka stream. So you need to handle disconnects.
These can be done in two ways:

  1. control peristant akka stream by wraping it in an actor. You initialize you stream Flow on actor preStart and pipe stream complete to the actor that will stop it.
    If stream completes or fails actor will stop.
    Then wrap actor in actor backoff with restart strategy, that will restart the actor in case of complete or fail and reinitialize the Flow
  2. akka streams Delayed restarts with backoff stage

Personnaly I use #1 and did not try #2 yet.

Initializing backoff actor for #1 or Flow for #2 can be done in your Lagom components trait (basically in the same place where you do your subscribe now using Lagom Broker API).

Be sure to set a consumer group when configuring consumer to ensure avoiding duplicate consuming. You can use, like Lagom does, service name from descriptor as consumer group name.

Hope this helps.

1 Like

Awesome!

Yeah, I figured leveraging Lagom for integration with Kafka would be preferable.

Anyway, my thanks to everyone who helped :smiley: