Lagom kafka publishing multiple times

Hi,

I am using lagom for publishing to kafka. I am using JDBC for for persistence. I have deployed two instances for availability. My issue now is that every time a request comes both instances are publishing to kafka. Is there any configuration that i can set in application.conf which will prevents this.

Below is my loader and application.conf file

class TweedLoader extends LagomApplicationLoader {

  override def load(context: LagomApplicationContext): LagomApplication =
    new TweedApplication(context) with ConfigurationServiceLocatorComponents

  override def describeService = Some(readDescriptor[TweedService])
}

abstract class TweedApplication(context: LagomApplicationContext)
  extends LagomApplication(context)
    with JdbcPersistenceComponents
    with HikariCPComponents
    with LagomKafkaComponents
    with AhcWSComponents {

  override lazy val lagomServer: LagomServer = serverFor[TweedService](wire[TweedServiceImpl])
  lazy val appsImpl: AppsImpl = wire[AppsImpl]

  override def jsonSerializerRegistry: JsonSerializerRegistry = NotificationSerializerRegistry

  val cs = clusterSharding.init(
    Entity(NotificationState.typeKey)(
      entityContext => NotificationBehavior.create(entityContext)
    )
  )
}
play.application.loader = com.tweed.impl.TweedLoader
play.http.secret.key="QCY?tAnfk?aZ?iwrNwnxIlR6CTf:G3gf:90Latabg@5241AB`R5W:1uDFN];Ik@n"
akka.discovery.method=config
lagom.cluster.bootstrap.enabled = false
lagom.cluster.join-self = true
akka.cluster.min-nr-of-members = 1
lagom.circuit-breaker {
  default.enabled = on
  default.call-timeout = 50s
  default.reset-timeout = 15s
  default.max-failures = 5
}
lagom.broker.kafka {
    service-name =""
    brokers = "kafkahost:9092"
}
db.default {
 driver = "com.mysql.cj.jdbc.Driver"
 url = "jdbc:mysql://mysqlhost/sample_db?charset=utf8&parseTime=True"
 username = "user"
 password = "password"
  async-executor {
    queueSize = 10000
    numThreads = 20
    minConnections = 20
    maxConnections = 20
  }
  hikaricp {
    minimumIdle = ${db.default.async-executor.minConnections}
    maximumPoolSize = ${db.default.async-executor.maxConnections}
  }
}

jdbc-defaults.slick {
  profile = "slick.jdbc.MySQLProfile$"
  jndiName = DefaultDS
}

Hi @sandeep,

the problem is you are deploying two isolated processes. You should tune your processes so they connected to each other and created an Akka Cluster. Since each process doesn’t’ know the other one exists each process will run all the workers necessary for each projection (e.g. a TopicProducer) causing duplicates. The first thing you must fix is this setting:

lagom.cluster.join-self should only be used in local environments (development) where you are running a single-node cluster.

Note that running multiple, single-node clusters in production also has the potential to corrupt your Persistent Entities as you may end up with multiple copies of the same data in memory.

I strongly suggest you fix the cluster as soon as possible.

Cheers,

Hi @ignasi35 ,

Thank you for your quick reply.

I have changed my config to form akka cluster using akka-dns. i am facing a problem in bringing up my application. can you please have a look. Here is my updated loader and conf file

play.application.loader = com.tweed.impl.TweedLoader
play.http.secret.key="QCY?tAnfk?aZ?iwrNwnxIlR6CTf:G3gf:90Latabg@5241AB`R5W:1uDFN];Ik@n"
akka.discovery.method=akka-dns

akka.management.cluster.bootstrap {
  contact-point-discovery {
    discovery-method = akka.discovery
    service-name = "brent.service.consul"
    required-contact-point-nr = 0
  }
}
lagom.circuit-breaker {
  default.enabled = on
  default.call-timeout = 50s
  default.reset-timeout = 15s
  default.max-failures = 5
}
lagom.broker.kafka {
    service-name =""
    brokers = "kafkahost:9092"
}
db.default {
 driver = "com.mysql.cj.jdbc.Driver"
 url = "jdbc:mysql://mysqlhost/sample_db?charset=utf8&parseTime=True"
 username = "user"
 password = "password"
  async-executor {
    queueSize = 10000
    numThreads = 20
    minConnections = 20
    maxConnections = 20
  }
  hikaricp {
    minimumIdle = ${db.default.async-executor.minConnections}
    maximumPoolSize = ${db.default.async-executor.maxConnections}
  }
}

jdbc-defaults.slick {
  profile = "slick.jdbc.MySQLProfile$"
  jndiName = DefaultDS
class BrentLoader extends LagomApplicationLoader {

  override def load(context: LagomApplicationContext): LagomApplication =
    new BrentApplication(context) with AkkaDiscoveryComponents

  override def describeService = Some(readDescriptor[BrentService])
}

abstract class BrentApplication(context: LagomApplicationContext)
  extends LagomApplication(context)
    with JdbcPersistenceComponents
    with HikariCPComponents
    with LagomKafkaComponents
    with AhcWSComponents {

  override lazy val lagomServer: LagomServer = serverFor[BrentService](wire[BrentServiceImpl])
  lazy val appsImpl: AppsImpl = wire[AppsImpl]

  val sharding: ActorRef[ShardingEnvelope[AlertCommand]] = clusterSharding.init(
    Entity(AlertState.typeKey)(
      entityContext => {
        AlertBehavior.create(entityContext)
      }
    )
  )

  override def jsonSerializerRegistry: JsonSerializerRegistry = AlertSerializerRegistry

}

Here is the error log i am getting continuously

2021-02-01T16:30:23.685Z ERROR akka.management.cluster.bootstrap.internal.HttpContactPointBootstrap akkaAddress=akka://application@xxx.xx.xx.xx:25520, sourceThread=application-akka.actor.default-dispatcher-18, akkaSource=akka://application@xxx.xxx.xxx.xxx:25520/system/bootstrapCoordinator/contactPointProbe-YYY.YYY.YY.Y-8558, sourceActorSystem=application, akkaTimestamp=16:30:23.685UTC - Overdue of probing-failure-timeout, stop probing, signaling that it's failed