我刚开始使用Kafka。我面临着一个小问题与消费者。我用java编写了一个consumer。
我得到这个例外-非法状态例外这个消费者已经被关闭了。
我在下面一行遇到了异常:
ConsumerRecords<String,String> consumerRecords = consumer.poll(1000);
这是在我的消费者崩溃后发生的,当我再次尝试运行它时,它给了我这个异常。
完整代码如下:
package StreamApplicationsTest;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.*;
public class StreamAppConsumer {
public static void main(String[] args){
int i = 0;
//List<String> topics = new ArrayList<>();
List<String> topics = Collections.singletonList("test_topic");
//topics.add("test_topic");
Properties consumerConfigurations = new Properties();
consumerConfigurations.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
consumerConfigurations.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerConfigurations.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
consumerConfigurations.put(ConsumerConfig.GROUP_ID_CONFIG,"TestId");
Consumer<String,String> consumer = new KafkaConsumer<>(consumerConfigurations);
consumer.subscribe(topics);
while(true){
ConsumerRecords<String,String> consumerRecords = consumer.poll(1000);
Iterator<ConsumerRecord<String,String>> iterator = consumerRecords.iterator();
while(iterator.hasNext()){
i++;
ConsumerRecord<String,String> consumerRecord = iterator.next();
String key = consumerRecord.key();
String value = consumerRecord.value();
if(key=="exit" || value=="exit")
break;
System.out.println("Key="+key+"\tValue="+value);
}
System.out.println("Messages processed = "+Integer.toString(i));
consumer.close();
}
}
}
我只是被这个问题困住了,任何形式的帮助都是有用的。
2条答案
按热度按时间w8rqjzmb1#
这似乎管用
确保您的服务器或本地Kafka是可访问的
输出
hjzp0vay2#
这是因为您在无限循环结束时关闭消费者,所以当它第二次轮询消费者时,消费者已经关闭。为了解决眼前的问题,我会把整个
while(true)
在try-catch中循环,并在catch或finally块中处理consumer。但是,如果Kafka消费者不小心处理不同的关机信号,则可能会丢失数据。我建议看一下confluent的例子,在这里优雅地关闭消费者。在你的情况下,因为你在运行主线程,它看起来像这样。。。
基本运行
consumer.wakeup()
是使用者中唯一的线程安全方法,因此它是唯一可以在java的shutdown hook中运行的方法。由于在调用wakeup时消费者没有睡着,因此会触发wakeupexecution,从而优雅地关闭消费者。