consumeriterator对象上的java迭代似乎是永无止境的循环

5f0d552i  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(192)

我尝试在版本0.8.2.1中为ApacheKafka实现一个java客户机。我知道新版本的javaconsumerapi将在新版本的apachekafka中提供,但是现在我必须在当前版本中实现consumer客户机。
所以我已经做了,但是我在检查收到的消息的数量时遇到了问题。问题似乎是在对consumeriterator对象进行迭代以获取消息及其id。迭代似乎是永无止境的循环。请看以下代码:

public class TestKafkaConsumer extends Thread{

private final static Logger logger = Logger.getLogger(TestKafkaConsumer.class);
private Properties appProperties;
private String topic;
private ZookeeperConsumerConnector consumer;

public TestKafkaConsumer() throws Exception {
    topic = "topic";
    consumer = (ZookeeperConsumerConnector) Consumer.createJavaConsumerConnector(createConsumerConfig());
}

public static void main( String[] args ){
    PropertyConfigurator.configure("log4j.properties");
    try {
        TestKafkaConsumer testConsumer = new TestKafkaConsumer();
        testConsumer.start();
    } catch (Exception e) {
        logger.error("Error create consumer ", e);
    }

}

@Override
public void run() {
    logger.info("Consumer thread - start");
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put(topic, new Integer(1));
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = (Map<String, List<KafkaStream<byte[], byte[]>>>) consumer
            .createMessageStreams(topicCountMap);
    KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
    ConsumerIterator<byte[], byte[]> it = stream.iterator();
    //logger.info("Count of consumed messages: " + it.length());
    long msgCount = 0;
    while (it.hasNext()) {
            MessageAndMetadata<byte[], byte[]> messageAndMetadata  = it.next();
            logger.info("Key: " + new String(messageAndMetadata.key()));
            logger.info("Message: " + new String(messageAndMetadata.message()));
            msgCount++;
        }
    logger.info("Summary count of consumed messages: " + msgCount); 
}

private ConsumerConfig createConsumerConfig() throws Exception{
    logger.info("createConsumerConfig - start");
    Properties props = new Properties();
    props.put("zookeeper.connect", "localhost:2181");
    props.put("group.id","testGrp");
    props.put("zookeeper.session.timeout.ms", "400");
    props.put("zookeeper.sync.time.ms", "200");
    props.put("auto.commit.interval.ms", "10000");
    logger.info("createConsumerConfig - finish");
    return new ConsumerConfig(props);                
}

}

问题是日志文件中没有出现包含已消耗消息计数的日志条目,因此我认为迭代永远不会结束。我已经测试过所有的信息都被读取了。所以所有的记录都是迭代的,但程序不会留下循环。此外,我还注意到,在开始循环(取消注解appriopriate行)之前,当我尝试获取consumeriterator的长度(或大小)时,程序似乎在此处停止,循环甚至没有启动-日志文件中没有关于读取记录的条目。
问题出在哪里-课堂消费者?也许我做错了什么。如果有人遇到这样的问题能帮助我,我将不胜感激。

ilmyapht

ilmyapht1#

consumeriterator的javadoc,http://apache.mirrorcatalogs.com/kafka/0.8.2-beta/scala-doc/index.html#kafka.consumer.consumeriterator,表示“一个迭代器,它阻塞直到可以从提供的队列中读取值为止。”
这就是问题所在吗?

相关问题