我使用下面的代码来读取主题的数据,即“sha-test2”,但它读取的是完全不同的代码行,即20行中的10行。但当我运行console时,它会显示所有20行。即。bin/kafka-console-consumer.sh--缩放器localhost:2181 --topic sha-test2——从头开始
我怎么了?非常感谢你的帮助。
public class KafkaTestConsumer extends Thread {
//final static String clientId = "SimpleConsumerDemoClient";
final static String TOPIC = "sha-test2";
ConsumerConnector consumerConnector;
public static void main(String[] argv) throws
UnsupportedEncodingException {
KafkaTestConsumer helloKafkaConsumer = new KafkaTestConsumer();
helloKafkaConsumer.start();
}
public KafkaTestConsumer(){
Properties properties = new Properties();
properties.put("zookeeper.connect","172.23.32.35:2181");
properties.put("group.id","test-group");
ConsumerConfig consumerConfig = new ConsumerConfig(properties);
consumerConnector =
Consumer.createJavaConsumerConnector(consumerConfig);
}
@Override
public void run() {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(TOPIC, new Integer(1));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
consumerConnector.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> stream = consumerMap.get(TOPIC).get(0);
System.out.println("consumerMap : \n " + consumerMap.toString() );
ConsumerIterator<byte[], byte[]> it = stream.iterator();
System.out.println("run started");
while(it.hasNext()){
System.out.println(new String(it.next().message()));
}
}
Thank you.
~Shyam
2条答案
按热度按时间7kqas0il1#
问题在于:
你告诉警察
consumerConnector
为主题创建单个使用者线程,但主题(显然)有两个分区。中的使用者线程数"test-group"
组应该等于或大于分区数,否则某些分区将不会被组读取,这正是您的情况。请看这个示例,其中线程数是通过命令行参数设置的。
或者,您可以从zookeeper中读取存储其元数据的分区的确切数目
/brokers/topics/your_topic_name/partitions
节点。tuwxkamq2#
你的代码看起来很好。这看起来像是一个抵消问题。高级消费者将其补偿存储在zookeeper中。
在你的情况下,这就是happened:- 1. 你在Kafka2中放了10条信息。你运行了消费代码,它成功地读取了所有10条信息。同时,消费者在zookeeper中将消耗的偏移量更新为10。三。你阻止你的消费者。4你又给Kafka发了10条信息。您再次启动消费代码。它只读取最后10条消息,而不是之前推送的10条消息,因为当您重新启动consumer时,它将检查zookeeper以找出从哪个偏移量恢复消费。
请尝试使用不同的组id重新运行您的使用者,或在从zookeeper中删除偏移量后重试。它应该很好用。