How to catch java.net.ConnectException: Connection refused on akka steram?

kafka
(Bifunctor) #1

Hi all

I have a kafka consumer that looks as the following:

import akka.actor.ActorSystem
import akka.kafka.scaladsl.Consumer
import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer

import scala.util.{Failure, Success}

object App {
  def main(args: Array[String]): Unit = {


    implicit val system = ActorSystem("SAP-SENDER")
    implicit val executor = system.dispatcher
    implicit val materilizer = ActorMaterializer()

    val config = system.settings.config.getConfig("akka.kafka.consumer")

    val consumerSettings: ConsumerSettings[String, String] =
      ConsumerSettings(config, new StringDeserializer, new StringDeserializer)
        .withBootstrapServers("localhost:9003")
        .withGroupId("SAPSENDER")
        .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")

    Consumer
      .plainSource(
        consumerSettings,
        Subscriptions.topics("TEST-TOPIC")
      )
      .runWith(Sink.foreach(println))
      .onComplete{
        case Success(_) => println("Goood")
        case Failure(ex) =>
          println(s"I am failed ==============> ${ex.getMessage}")
          system.terminate()
      }

  }
} 

The kafka server is not active and I would like just to terminate the consumer. It always tries to connect and shows the following messages:

19:03:47.342 [SAP-SENDER-akka.kafka.default-dispatcher-15] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-1, groupId=SAPSENDER] Pausing partitions []
19:03:47.342 [SAP-SENDER-akka.kafka.default-dispatcher-15] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=SAPSENDER] No broker available to send FindCoordinator request
19:03:47.342 [SAP-SENDER-akka.kafka.default-dispatcher-15] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, groupId=SAPSENDER] Give up sending metadata request since no node is available
19:03:47.342 [SAP-SENDER-akka.kafka.default-dispatcher-15] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=SAPSENDER] Coordinator discovery failed, refreshing metadata
19:03:47.342 [SAP-SENDER-akka.kafka.default-dispatcher-15] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, groupId=SAPSENDER] Give up sending metadata request since no node is available
19:03:47.412 [SAP-SENDER-akka.kafka.default-dispatcher-17] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-1, groupId=SAPSENDER] Pausing partitions []
19:03:47.412 [SAP-SENDER-akka.kafka.default-dispatcher-17] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=SAPSENDER] No broker available to send FindCoordinator request
19:03:47.412 [SAP-SENDER-akka.kafka.default-dispatcher-17] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, groupId=SAPSENDER] Give up sending metadata request since no node is available
19:03:47.412 [SAP-SENDER-akka.kafka.default-dispatcher-17] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=SAPSENDER] Coordinator discovery failed, refreshing metadata
19:03:47.412 [SAP-SENDER-akka.kafka.default-dispatcher-17] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, groupId=SAPSENDER] Give up sending metadata request since no node is available
19:03:47.478 [SAP-SENDER-akka.kafka.default-dispatcher-20] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-1, groupId=SAPSENDER] Pausing partitions []   

it also says:

java.net.ConnectException: Connection refused
	at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
	at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
	at org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:50)
	at org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:173)
	at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:515)
	at org.apache.kafka.common.network.Selector.poll(Selector.java:467)
	at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:535)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:231)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:316)
	at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1214)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1179)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1164)
	at akka.kafka.internal.KafkaConsumerActor.poll(KafkaConsumerActor.scala:380)
	at akka.kafka.internal.KafkaConsumerActor.akka$kafka$internal$KafkaConsumerActor$$receivePoll(KafkaConsumerActor.scala:360)
	at akka.kafka.internal.KafkaConsumerActor$$anonfun$receive$1.applyOrElse(KafkaConsumerActor.scala:221)
	at akka.actor.Actor.aroundReceive(Actor.scala:539)
	at akka.actor.Actor.aroundReceive$(Actor.scala:537)
	at akka.kafka.internal.KafkaConsumerActor.akka$actor$Timers$$super$aroundReceive(KafkaConsumerActor.scala:142)
	at akka.actor.Timers.aroundReceive(Timers.scala:51)
	at akka.actor.Timers.aroundReceive$(Timers.scala:40)
	at akka.kafka.internal.KafkaConsumerActor.aroundReceive(KafkaConsumerActor.scala:142)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:610)
	at akka.actor.ActorCell.invoke(ActorCell.scala:579)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:268)
	at akka.dispatch.Mailbox.run(Mailbox.scala:229)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)  

How to catch up the ConnectException in the stream and stop consumer from trying to connect kafka.

The code is hosted here https://gitlab.com/akka-samples/kafkaconsumer.

Thanks a lot

(Enno) #2

Hi @bifunctor

The Kafka 2.x consumer APIs don’t expose the connection status. There is no built-in way to detect if you passed correct bootstrap server addresses.

See https://github.com/akka/alpakka-kafka/issues/674

Regards,
Enno.