为什么kafka消费者表现缓慢?

laawzig2  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(406)

我有一个简单的主题,一个简单的Kafka消费者和生产者,使用默认配置。
这个程序很简单,我有两个线程。
在producer中,它不断发送16字节的数据。
而在消费者方面,它一直在接受。
我发现,生产商的吞吐量大约是10mb/s,这很好。
但是用户的吞吐量只有0.2mb/s。我已经禁用了所有的调试日志,但这并没有使它变得更好。测试正在本地计算机上运行。有人知道哪里出了问题吗?谢谢!
我使用的代码如下:producer:

KafkaProducer producer = new KafkaProducer(props);
int size = 16;
byte[] payload = new byte[size];
String key = "key";
Arrays.fill(payload, (byte) 1);
ProducerRecord record = new ProducerRecord("test",0,key.getBytes(),payload);
while(true){
producer.send(record);
}

消费者:

Properties consumerProps = new Properties();
consumerProps.put("zookeeper.connect", "localhost:2181");
consumerProps.put("group.id", "test");
ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerProps));
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put("test", 1);
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get("test");
ConsumerIterator<byte[], byte[]> it = streams.get(0).iterator();
while(it.hasNext()){
    it.next().message();
}
kupeojn6

kupeojn61#

尝试使用以下属性配置使用者。 fetch.min.bytes fetch.max.wait.ms max.partition.fetch.bytes 此外,还可以调整 poll() 吞吐量方法。

ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

相关问题