kafka配置最大字节数仍受500条消息的限制

v64noz0r  于 2021-06-04  发布在  Kafka
关注(0)|答案(3)|浏览(771)

我的消费者配置如下:
问题是,当我从测试主题(1个分区包含1000条消息)中轮询数据时,每次轮询只能得到500条消息。每条消息大约90字节一条。这个配置应该足够高,可以处理所有的数据。为什么会这样?

消费配置

public static KafkaConsumer<String, SpecificRecordBase> createConsumer(
            Arguments args) {
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, args.bootstrapServers);
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, SpecificAvroDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
        properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, args.groupId);
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        properties.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "4500");

        // Data batching configuration
        properties.setProperty(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "500000000");
        properties.setProperty(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, "500000000");
        properties.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "500000000");

        // Specify the number of bytes you want to read in batch
        properties.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
        properties.setProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, args.schemaRegistryUrl);

        return new KafkaConsumer<>(properties);
    }

投票器

.....
            while (true) {
                ConsumerRecords<String, SpecificRecordBase> records =
                        myConsumer.poll(Duration.ofSeconds(CONSUMER_POLL_SECONDS));
....

这里的记录是500
编辑:
读取默认轮询计数为500的文档。我需要哪种配置?我真的不关心消息的数量,我关心的是我流的字节数。

properties.setProperty(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "500000000");
        properties.setProperty(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, "500000000");
        properties.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "500000000");
        properties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500000000");
afdcj2ne

afdcj2ne1#

存在使用者配置属性 max.poll.records 左你没有改变它的默认值是500。
如果您使用的是java使用者,那么还可以调整max.poll.records以调整每个循环迭代中处理的记录数。
参考:confluent kafka consumer properties
我记得我有一个类似的问题,但在我的情况下,这个问题是由一个字节的限制。

hiz5n14c

hiz5n14c2#

看来消费者方面的配置还可以。但您也应该考虑代理配置。在代理端有另一个大小限制,称为 message.max.bytes . 你也应该增加。
来自Kafka文件:
message.max.bytes:kafka允许的最大记录批大小。如果此值增加并且存在早于0.10.2的消费者,消费者的获取大小也必须增加,以便他们能够获取如此大的记录批。在最新的消息格式版本中,为了提高效率,总是将记录分组到批中。在以前的消息格式版本中,未压缩的记录不会分组到批中,在这种情况下,此限制仅适用于单个记录。可以使用主题级别max.message.bytes config为每个主题设置此限制(默认值:1000012)
您还可以查看此项以了解更多信息。

vhipe2zx

vhipe2zx3#

完成。
似乎您想要精确地控制代理将发送给您的消费者的字节数。实际上,您需要使用以下参数:

FETCH_MIN_BYTES_CONFIG

==>服务器应为获取请求返回的最小数据量。

FETCH_MAX_BYTES_CONFIG

==>代理应为获取请求返回的最大数据量。请记住,如果第一个非空分区的第一批记录的大小大于此值,则代理仍将返回它(以让使用者继续进行)。这不是绝对的最大值。

FETCH_MAX_WAIT_MS_CONFIG

==>如果没有足够的数据立即满足请求,则服务器在响应获取请求之前将阻止的最长时间。应该小于或等于轮询中使用的超时(timeout)如果要控制流的大小,使用此参数可能会很有效,但会增加延迟。

MAX_POLL_RECORDS_CONFIG

==>获取请求返回的最大记录数。正如在otheranswer中已经解释的那样,如果您想控制代理答案的大小,这个参数很重要。
如果 S 是预期的有效负载大小,以及 s 记录的平均预期大小,您应该确保max\u poll\u records\u config> S / s 请记住,您希望对有效负载(记录)大小的控制越多,您可能会招致更多的延迟(通过增加 FETCH_MAX_WAIT_MS_CONFIG ).

相关问题