zookeeper中的Kafka偏移持久性

ha5z0ras  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(409)

我是新来的风暴/Kafka。我已经能够配置一个基本的工作原型:
Zookeeper3.4.5
Kafka2.11-0.9.0.1
storm 1.0.0(实际上是一个嵌入式java本地集群)
风暴-Kafka-1.0.0
我能够产生信息并从风暴拓扑中消耗它们。
我有一个关于Kafka的问题。
最初我在使用kafka实用程序时找不到java客户机中使用的组。
经过一番搜索,我读到风暴Kafka存储在zookeeper这个偏移量。如果我要在《Kafka风暴》中进行配置:
zkroot=“/我的\u根”
group.id=“我的团队”
然后我可以使用zookeeper zkcli.sh脚本检索偏移量。

get /my_root/my_group/partition_0
==> "topology":{},"offset":3148,..., "topic":"rawdatas"

我的问题是我不明白这个偏移量是如何以及多久更新一次的。在Storm中,我肯定会确认处理过的每个元组。
当拓扑开始,元组开始被处理时,我可以在zookeeper中看到偏移量的一点跳跃(例如:十几个),然后偏移量在很长时间内不会移动。
有时,我能看到一个更大的跳跃(如一千个),但它似乎是随机的。关于默认的kafka配置,我知道应该每2秒更新一次偏移量

// setting for how often to save the current kafka offset to ZooKeeper
public long stateUpdateIntervalMs = 2000;

我错过什么了吗?
弗兰克

6rqinv9w

6rqinv9w1#

在Kafka喷口代码潜水后,我更好地理解了我的问题。
这篇文章也帮助了我:http://www.developer.com/open/addressing-internal-apache-storm-buffers-overflowing.html
和往常一样,一切都是配置问题
在我的示例拓扑中,我有一个kafka喷口,它向一个简单的单螺纹螺栓发射元组,这个螺栓进行“密集计算”,我们可以用一个简单的thread.sleep(1000)来模拟
storm有一个限制,即在拓扑中可以有多少元组处于活动状态(默认值512)。在喷口发射和元组的最终确认之间也有一个超时,默认值为30秒。
我认为重要的是:
在toplogy start上,发出前512个元组,并开始由bolt处理。
大约30秒后,由于超时,喷口开始接收某些元组上的fail()。这些元组被添加到另一个要重播的风暴队列中。
默认的重播策略意味着大量的重播尝试。
在试图重放元组的所有时间内,zookeeper中无法提交高级偏移量。这就是为什么我看不到这个偏移。
一段时间后,重放队列使堆饱和,进程挂起。
在我的例子中,我只需要调整maxpoutpending和messagetimestart,以便在拓扑中有一个良好的流

StormTopology topology = builder.createTopology();
conf.setMaxSpoutPending(50);
conf.setMessageTimeoutSecs(120);

弗兰克

相关问题