我无法在最新版本的kafka 0.10和0.11中找到simpleconsumer支持。在以下用例中使用高级消费者是否合适:
用例:我想手动控制每个主题的偏移量并保存在外部源代码中。
高级消费者的问题在于上面的用例:当我进行投票时,
1) 我可以得到信息,但我无法控制要阅读多少信息。
2) 我无法决定何时停止轮询,即我应该如何定义批量大小。
我无法在最新版本的kafka 0.10和0.11中找到simpleconsumer支持。在以下用例中使用高级消费者是否合适:
用例:我想手动控制每个主题的偏移量并保存在外部源代码中。
高级消费者的问题在于上面的用例:当我进行投票时,
1) 我可以得到信息,但我无法控制要阅读多少信息。
2) 我无法决定何时停止轮询,即我应该如何定义批量大小。
2条答案
按热度按时间prdp8dxp1#
您可以通过设置“enable.auto.commit”、“false”手动控制偏移量,只有当应用程序显式选择这样做时,才会提交偏移量。
consumer.commitsync();
api将提交poll()返回的最新偏移量,并在提交偏移量后返回,如果由于某种原因提交失败,则抛出异常。
如果您希望通过显式指定偏移量来更好地控制已提交的消息。
提交指定主题和分区列表的指定偏移量。
这是一个同步提交,将一直阻塞,直到提交成功或遇到不可恢复的错误(在这种情况下,它将被抛出给调用方)。
max.poll.records=>控制对poll()的单个调用所允许的最大记录数
jjjwad0x2#
Kafka康苏美尔阶级有自己的背景
max.poll.records
它指定可以获取的最大记录数-默认情况下,没有限制。KafkaConsumer
还允许您手动控制偏移,并将其存储在需要的位置。《Kafka:权威指南》一书(可从confluent网站免费获得)中有关于这些主题的很大一部分。