java中kafka consumer挂在.hasnext上

kgqe7b3p  于 2021-06-08  发布在  Kafka
关注(0)|答案(2)|浏览(345)

我有一个简单的javaKafka消费者,代码如下

public void run() {
        ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
        while (it.hasNext()&& !done){
            try {
                System.out.println("Parsing data");
                byte[] data = it.next().message();
                System.out.println("Found data: "+data);
                values.add(data); // array list
            } catch (InvalidProtocolBufferException e) {
                e.printStackTrace();
            }
        }
        done = true;
    }

当消息被发布时,数据被成功读取,但是当它返回到checking it.hasnext()时,它将保持挂起状态,并且永远不会返回。
有什么能拖延这件事?
mïu流是一个kafkastream,如下所示:

Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(a_numThreads));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
executor = Executors.newFixedThreadPool(a_numThreads);
for (final KafkaStream stream : streams) {
   // m_stream is one of these streams
}
r3i60tvu

r3i60tvu1#

解决方法是添加属性
“consumer.timeout.ms”
现在,当达到超时时,将抛出consumertimeoutexception

gzszwxb4

gzszwxb42#

方法 hasNext() 正在阻塞。
您可以在属性中更改阻塞的超时 consumer.timeout.ms 请注意,它将抛出 TimeoutException 超时将过期时。
会阅读这些关于消费者的文档:https://cwiki.apache.org/confluence/display/kafka/consumer+group+examplehttps用法:/cwiki.apache.org/confluence/display/kafka/0.8.0+simpleconsumer+example

相关问题