无法轮询/获取来自kafka主题的所有记录

7kjnsjlb  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(302)

我正在尝试从一个特定的主题中轮询数据,比如kafka正在接收100条记录/秒,但大多数情况下它不会获取所有记录。我使用的超时为5000ms,我调用这个方法的每一天 100ms 注意:我也在订阅特定的主题
@已计划(fixeddelaystring=“100”)

public void pollRecords() {
        ConsumerRecords<String, String> records = 
        leadConsumer.poll("5000");

我怎样才能从Kafka那里得到所有的数据?

xytpbqjk

xytpbqjk1#

poll()返回的最大记录数用 max.poll.records 使用者配置参数(默认值为500)此外,还有另一个consumer config参数限制从服务器返回的最大数据量( fetch.max.bytes 以及 max.partition.fetch.bytes )
另一方面,在代理端有另一个大小限制,称为 message.max.bytes .
因此,您应该正确设置这些参数以获取更多消息。
来自Kafka文档(链接):
max.poll.records:对poll()的单个调用中返回的最大记录数(默认值:500)
fetch.max.bytes:服务器应为fetch请求返回的最大数据量。使用者分批获取记录,如果获取的第一个非空分区中的第一个记录批大于此值,则仍将返回该记录批以确保使用者可以取得进展。因此,这不是一个绝对的最大值。代理接受的最大记录批大小通过message.max.bytes(代理配置)或max.message.bytes(主题配置)定义。请注意,使用者并行执行多个获取(default:52428800)
message.max.bytes:kafka允许的最大记录批大小。如果此值增加并且存在早于0.10.2的消费者,消费者的获取大小也必须增加,以便他们能够获取如此大的记录批。在最新的消息格式版本中,为了提高效率,总是将记录分组到批中。在以前的消息格式版本中,未压缩的记录不会分组到批中,在这种情况下,此限制仅适用于单个记录。可以使用主题级别max.message.bytes config为每个主题设置此限制(默认值:1000012)
max.partition.fetch.bytes:服务器将返回的每个分区的最大数据量。消费者批量获取记录。如果获取的第一个非空分区中的第一个记录批大于此限制,则仍将返回该批,以确保使用者可以取得进展。代理接受的最大记录批大小通过message.max.bytes(代理配置)或max.message.bytes(主题配置)定义。有关限制使用者请求大小的信息,请参阅fetch.max.bytes(默认值:1048576)

相关问题