kafkastream上的低吞吐量

vptzau2j  于 2021-06-04  发布在  Kafka
关注(0)|答案(2)|浏览(755)

我的Kafka流吞吐量有些问题。我试着读一个有90万条记录的主题。我的kafka流应用程序基本上只打印每条记录,我的吞吐量达到每秒~4k条记录。但是,如果我使用基本的kafka avro console consumer命令行使用完全相同的主题,我将获得每秒约80k条记录的吞吐量!有没有一些已知的限制可以解释为什么流应用程序的性能不如kafka avro控制台消费者的性能?关于我应该调整哪个流配置以获得更好的性能,有什么指导吗?
我的配置是:

Properties configs = new Properties();
configs.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaConfig.getBootstrapServer());
configs.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
            kafkaConfig.getSchemaRegistryServer());
configs.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
configs.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, EARLIEST);
configs.put(StreamsConfig.APPLICATION_ID_CONFIG, "KS-test3");

拓扑结构是这样的:

StreamsBuilder streamsBuilder = new StreamsBuilder();
    streamsBuilder.stream(scheduleEventTopic)
                  .foreach(this::printRecord);
    return streamsBuilder.build();
qoefvg9y

qoefvg9y1#

尝试增加 max.poll.records 去更高的地方。此配置意味着您可以在一个 poll() ```
max.poll.records (1000 default)

你可能还想看看 `max.poll.interval.ms` 每次投票之间的时间,并尝试减少它,看看。
另外,您可能希望增加流线程的数量,并将其设置为您正在使用的主题的分区数。

num.stream.threads (1 default)

参考文献:https://docs.confluent.io/current/streams/developer-guide/config-streams.html
p、 s:默认值来自上述参考,您的可能会有所不同。
fkaflof6

fkaflof62#

我发现了我的问题。这个 commit.interval 设置为0以禁用聚合中的批处理。相反,我用了 cache.max.bytes.buffering 在不影响性能的情况下达到同样的效果。我的吞吐量从4k转为100k

相关问题