我的消费者配置如下:
问题是,当我从测试主题(1个分区包含1000条消息)中轮询数据时,每次轮询只能得到500条消息。每条消息大约90字节一条。这个配置应该足够高,可以处理所有的数据。为什么会这样?
消费配置
public static KafkaConsumer<String, SpecificRecordBase> createConsumer(
Arguments args) {
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, args.bootstrapServers);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, SpecificAvroDeserializer.class.getName());
properties.setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, args.groupId);
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "4500");
// Data batching configuration
properties.setProperty(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "500000000");
properties.setProperty(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, "500000000");
properties.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "500000000");
// Specify the number of bytes you want to read in batch
properties.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
properties.setProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, args.schemaRegistryUrl);
return new KafkaConsumer<>(properties);
}
投票器
.....
while (true) {
ConsumerRecords<String, SpecificRecordBase> records =
myConsumer.poll(Duration.ofSeconds(CONSUMER_POLL_SECONDS));
....
这里的记录是500
编辑:
读取默认轮询计数为500的文档。我需要哪种配置?我真的不关心消息的数量,我关心的是我流的字节数。
properties.setProperty(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "500000000");
properties.setProperty(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, "500000000");
properties.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "500000000");
properties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500000000");
3条答案
按热度按时间afdcj2ne1#
存在使用者配置属性
max.poll.records
左你没有改变它的默认值是500。如果您使用的是java使用者,那么还可以调整max.poll.records以调整每个循环迭代中处理的记录数。
参考:confluent kafka consumer properties
我记得我有一个类似的问题,但在我的情况下,这个问题是由一个字节的限制。
hiz5n14c2#
看来消费者方面的配置还可以。但您也应该考虑代理配置。在代理端有另一个大小限制,称为
message.max.bytes
. 你也应该增加。来自Kafka文件:
message.max.bytes:kafka允许的最大记录批大小。如果此值增加并且存在早于0.10.2的消费者,消费者的获取大小也必须增加,以便他们能够获取如此大的记录批。在最新的消息格式版本中,为了提高效率,总是将记录分组到批中。在以前的消息格式版本中,未压缩的记录不会分组到批中,在这种情况下,此限制仅适用于单个记录。可以使用主题级别max.message.bytes config为每个主题设置此限制(默认值:1000012)
您还可以查看此项以了解更多信息。
vhipe2zx3#
完成。
似乎您想要精确地控制代理将发送给您的消费者的字节数。实际上,您需要使用以下参数:
==>服务器应为获取请求返回的最小数据量。
==>代理应为获取请求返回的最大数据量。请记住,如果第一个非空分区的第一批记录的大小大于此值,则代理仍将返回它(以让使用者继续进行)。这不是绝对的最大值。
==>如果没有足够的数据立即满足请求,则服务器在响应获取请求之前将阻止的最长时间。应该小于或等于轮询中使用的超时(timeout)如果要控制流的大小,使用此参数可能会很有效,但会增加延迟。
==>获取请求返回的最大记录数。正如在otheranswer中已经解释的那样,如果您想控制代理答案的大小,这个参数很重要。
如果
S
是预期的有效负载大小,以及s
记录的平均预期大小,您应该确保max\u poll\u records\u config>S
/s
请记住,您希望对有效负载(记录)大小的控制越多,您可能会招致更多的延迟(通过增加FETCH_MAX_WAIT_MS_CONFIG
).