如何在java中检测kafka代理是否不能从消费者处获得?

li9yvcax  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(312)

我有一个简单的javaKafka消费者。如果Kafka经纪人不在,我想抓住一个例外。我需要它来打断线程。
我有这样的代码:

KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(kafkaConsumerProperties());
kafkaConsumer.subscribe(Arrays.asList(topic));
try {
    ConsumerRecords<String, String> records = kafkaConsumer.poll(500);
    // records handling
} catch(Exception e) {
    System.out.println(e.getMessage());
}finally {
    kafkaConsumer.close();
}

如果kafka服务器关闭,我不会捕获任何异常,但日志中会显示以下消息:

18/03/28 13:33:39 WARN clients.NetworkClient: [Consumer clientId=consumer-3, groupId=JAVA] Connection to node -1 could not be established. Broker may not be available.
18/03/28 13:33:40 WARN clients.NetworkClient: [Consumer clientId=consumer-1, groupId=JAVA] Connection to node -1 could not be established. Broker may not be available.

有没有办法在我的线程中处理它?

ppcbkaq5

ppcbkaq51#

像这样的方法可以奏效:

Runtime.getRuntime().addShutdownHook(new Thread() {
            public void run() {
                System.out.println("Starting exit...");
                consumer.wakeup(); 1
                try {
                    mainThread.join();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

你知道吗 consumer.wakeup() 中断当前用户的操作。
这个 mainThread.join() 放在那里是为了确保主线程实际完成,并且在唤醒后不会在处理过程中关闭。请记住,shutdownhook也负责处理中断,而不仅仅是普通的程序关闭。

2lpgd968

2lpgd9682#

我解决这个问题有点笨拙,但对我有用。

public boolean testSocket(String serversList) {        
    String[] sockets = serversList.split(",");
    int unactive = 0;
    for (int i = 0; i < sockets.length; i++) {
        try {
            String[] socket = sockets[i].split(":");
            (new Socket(socket[0], Integer.valueOf(socket[1]))).close();
        } catch (IOException e) {
            unactive++;
        }
    }
    if (unactive < sockets.length) return true;
    return false;
}

相关问题