kafka:在producer事务之后进行轮询不会得到生成的消息

zc0qhyus  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(501)

我有一个消费转换生产应用程序与Kafka一次权杖。(transactional)produce阶段在同一主题上生成新消息,然后使用这些消息(transactional=read\u committed)。只有一个线程执行此操作,并确保在生产者的事务提交之后进行使用者轮询。现在我每轮只有一个民意测验。

测试用例

当我运行testcase时,有时可能会有一些消息是其他生产者在我的生产者的事务提交之前发送的(清晰可见)。然后我经历了以下几点:
我的单个poll语句只返回这个外来消息,但不返回刚才生成的消息,尽管上一轮的事务提交成功。

问题

我是否遗漏了某些东西,以致上一轮的交易结果对下一轮的消费者不可见?
我是否必须发出多个轮询,直到一个轮询返回0条记录,并告诉我该分区中服务器上的所有消息都已读取?
Kafka能不能告诉你当前分区上的所有消息都被读取了?也许没有什么像“我现在已经读完这个分区了”?

配置

交易消费者
final map consumerconfig=新建linkedhashmap<>();consumerconfig.put(consumerconfig.bootstrap\u servers\u config,server);consumerconfig.put(consumerconfig.client\u id\u config,id);consumerconfig.put(consumerconfig.group\u id\u config,group\u id);consumerconfig.put(consumerconfig.isolation_level_config,“read_committed”);consumerconfig.put(consumerconfig.auto\u offset\u reset\u config,“latest”);consumerconfig.put(consumerconfig.enable_auto_commit_config,“false”);consumerconfig.put(consumerconfig.max_poll_records_config,“100”);consumerconfig.put(consumerconfig.key_deserializer_class_config,stringdeserializer.class.getname());consumerconfig.put(consumerconfig.value\u deserializer\u class\u config,stringdeserializer.class.getname());
事务生产者
final map producerconfig=新建linkedhashmap<>();producerconfig.put(producerconfig.bootstrap\u servers\u config,server);producerconfig.put(producerconfig.transactional\u id\u config,id);producerconfig.put(producerconfig.key\u serializer\u class\u config,stringserializer.class.getname());producerconfig.put(producerconfig.value\u serializer\u class\u config,stringserializer.class.getname());
我的投票超时是2秒
我的理解是事务生产者自动幂等和acks=all
我的测试用例只有一个代理和一个复制。但我当然打算在生产中使用更多
我用Kafka2.0
我的主题只有一个分区
我的线程有自己的使用者组,并被分配给这个分区

wfveoks0

wfveoks01#

为了让您了解poll的工作原理,我们传递给poll()的参数是一个超时时间间隔,它控制在使用者缓冲区中没有数据时poll()将阻塞多长时间。如果设置为0,poll()将立即返回;否则,它将等待指定的毫秒数,以便数据从代理到达。因此,如果将轮询配置为0毫秒,并且数据缓冲区中没有数据,则不会收到任何数据。
至于您没有收到最近生成的数据,这取决于您对生产者的配置。除非生成的消息没有其副本,并且基于acks参数,否则该消息将可供使用者使用。
例如:如果您已将副本设置为3并且acks=all,除非所有复制程序都确认他们已接收到消息,否则此消息将不可供使用者使用。
说到问题,你怎么知道你是否已经读了整个分区,如果你的轮询不再给你任何记录(假设rest都正常工作),那么它表明你已经使用了该主题的所有消息。

相关问题