Resource exhaustion: huge number of "kafka-producer-network-thread | producer-xx" threads created

I’m having trouble with my producer process apparently exhausting its resources.
After sending a large batch of messages the JVM process has a large number of threads named as follows:
kafka-producer-network-thread | producer-xx

My producer instance is shared across threads and looks similar to this:

class MyKafkaProducer @Inject()(actorSystem: ActorSystem)(implicit mat: Materializer) {
  val config = actorSystem.settings.config.getConfig("akka.kafka.producer")
  val producerSettings = ProducerSettings(config, new StringSerializer, new StringSerializer)
  val producer         = Producer.plainSink(producerSettings)

  def send(key: String, msg: String) = {
    val record = new ProducerRecord[String, String]("topic", key, msg)
    Source.single(record).runWith(producer)
  }
}

Do I need to release producer resources somehow after invoking send?

Had a similar problem - sorry for bumping but needed to share. Symptoms: massive increase in stack space usage, heap is normal - resulting in sawtooth memory usage as Kubernetes restarts the pod.

Turning on Kafka producer logging, I see multiple producers created with a failure to register the mbean for JMX (because it already did that).

To fix, I created a separate producer (not a sink), and then used .withProducer on the settings to create a sink, thereby forcing it to reuse the same producer. Works a treat now - boring flatline memory graph.

That way it reuses the same producer rather than creating a new one for each message passing through the sink.

1 Like