kafka java消费者已关闭

qxgroojn  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(399)

我刚开始使用Kafka。我面临着一个小问题与消费者。我用java编写了一个consumer。
我得到这个例外-非法状态例外这个消费者已经被关闭了。
我在下面一行遇到了异常:

ConsumerRecords<String,String> consumerRecords = consumer.poll(1000);

这是在我的消费者崩溃后发生的,当我再次尝试运行它时,它给了我这个异常。
完整代码如下:

package StreamApplicationsTest;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.*;

public class StreamAppConsumer {

public static void main(String[] args){
    int i = 0;
    //List<String> topics = new ArrayList<>();
    List<String> topics = Collections.singletonList("test_topic");
    //topics.add("test_topic");
    Properties consumerConfigurations = new Properties();
    consumerConfigurations.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
    consumerConfigurations.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    consumerConfigurations.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
    consumerConfigurations.put(ConsumerConfig.GROUP_ID_CONFIG,"TestId");

    Consumer<String,String> consumer = new KafkaConsumer<>(consumerConfigurations);
    consumer.subscribe(topics);

    while(true){
        ConsumerRecords<String,String> consumerRecords = consumer.poll(1000);
        Iterator<ConsumerRecord<String,String>> iterator = consumerRecords.iterator();
        while(iterator.hasNext()){
            i++;
            ConsumerRecord<String,String> consumerRecord = iterator.next();
            String key = consumerRecord.key();
            String value = consumerRecord.value();
            if(key=="exit" || value=="exit")
                break;
            System.out.println("Key="+key+"\tValue="+value);
        }

        System.out.println("Messages processed = "+Integer.toString(i));
        consumer.close();

    }
}
}

我只是被这个问题困住了,任何形式的帮助都是有用的。

w8rqjzmb

w8rqjzmb1#

这似乎管用

public static void main(String[] args) {

        List<String> topics = new ArrayList<>();
        topics.add("test.topic");

        final Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "IP_TO_KAFKA_SERVER");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(topics);

        System.out.println("Polling");
        ConsumerRecords<String, String> consumerRecords = consumer.poll(5000);

        try {
            for (ConsumerRecord<String, String> record : consumerRecords) {
                System.out.println(record.offset() + ": " + record.value());
            }
        } finally {
            consumer.close();
        }
    }

确保您的服务器或本地Kafka是可访问的
输出

--- exec-maven-plugin:1.2.1:exec (default-cli) @ MVN ---
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
Polling
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
6: test
7: tes
hjzp0vay

hjzp0vay2#

这是因为您在无限循环结束时关闭消费者,所以当它第二次轮询消费者时,消费者已经关闭。为了解决眼前的问题,我会把整个 while(true) 在try-catch中循环,并在catch或finally块中处理consumer。
但是,如果Kafka消费者不小心处理不同的关机信号,则可能会丢失数据。我建议看一下confluent的例子,在这里优雅地关闭消费者。在你的情况下,因为你在运行主线程,它看起来像这样。。。

public static void main(String[] args) {
    int i = 0;
    //List<String> topics = new ArrayList<>();
    List<String> topics = Collections.singletonList("test_topic");
    //topics.add("test_topic");
    Properties consumerConfigurations = new Properties();
    consumerConfigurations.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    consumerConfigurations.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    consumerConfigurations.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    consumerConfigurations.put(ConsumerConfig.GROUP_ID_CONFIG, "TestId");

    Consumer<String, String> consumer = new KafkaConsumer<>(consumerConfigurations);
    consumer.subscribe(topics);

    Runtime.getRuntime().addShutdownHook(new Thread()
    {
      public void run() {
        consumer.wakeup();
      }
    });

    try {
      while (true) {
        ConsumerRecords<String, String> consumerRecords = consumer.poll(1000);
        Iterator<ConsumerRecord<String, String>> iterator = consumerRecords.iterator();
        while (iterator.hasNext()) {
          i++;
          ConsumerRecord<String, String> consumerRecord = iterator.next();
          String key = consumerRecord.key();
          String value = consumerRecord.value();
          if (key == "exit" || value == "exit")
            break;
          System.out.println("Key=" + key + "\tValue=" + value);
        }
        System.out.println("Messages processed = " + Integer.toString(i));
      }
    } catch (WakeupExection e) {
      // Do Nothing
    } finally {
      consumer.close();
    }
  }
}

基本运行 consumer.wakeup() 是使用者中唯一的线程安全方法,因此它是唯一可以在java的shutdown hook中运行的方法。由于在调用wakeup时消费者没有睡着,因此会触发wakeupexecution,从而优雅地关闭消费者。

相关问题