我尝试在版本0.8.2.1中为ApacheKafka实现一个java客户机。我知道新版本的javaconsumerapi将在新版本的apachekafka中提供,但是现在我必须在当前版本中实现consumer客户机。
所以我已经做了,但是我在检查收到的消息的数量时遇到了问题。问题似乎是在对consumeriterator对象进行迭代以获取消息及其id。迭代似乎是永无止境的循环。请看以下代码:
public class TestKafkaConsumer extends Thread{
private final static Logger logger = Logger.getLogger(TestKafkaConsumer.class);
private Properties appProperties;
private String topic;
private ZookeeperConsumerConnector consumer;
public TestKafkaConsumer() throws Exception {
topic = "topic";
consumer = (ZookeeperConsumerConnector) Consumer.createJavaConsumerConnector(createConsumerConfig());
}
public static void main( String[] args ){
PropertyConfigurator.configure("log4j.properties");
try {
TestKafkaConsumer testConsumer = new TestKafkaConsumer();
testConsumer.start();
} catch (Exception e) {
logger.error("Error create consumer ", e);
}
}
@Override
public void run() {
logger.info("Consumer thread - start");
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(1));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = (Map<String, List<KafkaStream<byte[], byte[]>>>) consumer
.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
ConsumerIterator<byte[], byte[]> it = stream.iterator();
//logger.info("Count of consumed messages: " + it.length());
long msgCount = 0;
while (it.hasNext()) {
MessageAndMetadata<byte[], byte[]> messageAndMetadata = it.next();
logger.info("Key: " + new String(messageAndMetadata.key()));
logger.info("Message: " + new String(messageAndMetadata.message()));
msgCount++;
}
logger.info("Summary count of consumed messages: " + msgCount);
}
private ConsumerConfig createConsumerConfig() throws Exception{
logger.info("createConsumerConfig - start");
Properties props = new Properties();
props.put("zookeeper.connect", "localhost:2181");
props.put("group.id","testGrp");
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "10000");
logger.info("createConsumerConfig - finish");
return new ConsumerConfig(props);
}
}
问题是日志文件中没有出现包含已消耗消息计数的日志条目,因此我认为迭代永远不会结束。我已经测试过所有的信息都被读取了。所以所有的记录都是迭代的,但程序不会留下循环。此外,我还注意到,在开始循环(取消注解appriopriate行)之前,当我尝试获取consumeriterator的长度(或大小)时,程序似乎在此处停止,循环甚至没有启动-日志文件中没有关于读取记录的条目。
问题出在哪里-课堂消费者?也许我做错了什么。如果有人遇到这样的问题能帮助我,我将不胜感激。
1条答案
按热度按时间ilmyapht1#
consumeriterator的javadoc,http://apache.mirrorcatalogs.com/kafka/0.8.2-beta/scala-doc/index.html#kafka.consumer.consumeriterator,表示“一个迭代器,它阻塞直到可以从提供的队列中读取值为止。”
这就是问题所在吗?