我在Kafka中有一个消费者组。但是它的状态信息在多个经纪人之间不一致。这是怎么发生的?一旦发生了,我们如何减轻这种情况?
bin/kafka-consumer-groups.sh --bootstrap-server 10.32.218.112:9092 --describe --state --group consumer-group
COORDINATOR (ID) ASSIGNMENT-STRATEGY STATE #MEMBERS
10.32.218.112:9092 (1) range Stable 1
bin/kafka-consumer-groups.sh --bootstrap-server 10.32.67.102:9092 --describe --state --group consumer-group
COORDINATOR (ID) ASSIGNMENT-STRATEGY STATE #MEMBERS
10.32.218.112:9092 (1) range Stable 1
bin/kafka-consumer-groups.sh --bootstrap-server 10.33.150.9:9092 --describe --state --group consumer-group
Consumer group 'consumer-group' has no active members.
COORDINATOR (ID) ASSIGNMENT-STRATEGY STATE #MEMBERS
10.35.168.252:9092 (4) Empty 0
bin/kafka-consumer-groups.sh --bootstrap-server 10.35.168.252:9092 --describe --state --group consumer-group
Consumer group 'consumer-group' has no active members.
COORDINATOR (ID) ASSIGNMENT-STRATEGY STATE #MEMBERS
10.35.168.252:9092 (4) Empty 0
bin/kafka-consumer-groups.sh --bootstrap-server 10.33.21.48:9092 --describe --state --group consumer-group
Consumer group 'consumer-group' has no active members.
COORDINATOR (ID) ASSIGNMENT-STRATEGY STATE #MEMBERS
10.35.168.252:9092 (4) Empty 0
如果你能看到前2个经纪人认为经纪人1是协调人,但最后3个认为另一个经纪人是协调人。Kafka版本2.0.0。
更新
在同一个集群上,我可以看到一个消费者有两个协调器。这可能是一个集群级别的问题。但我不知道如何去调试这个问题。
bash kafka-consumer-groups.sh --bootstrap-server 10.35.168.252:9092 --describe --group consumer-group-1 --state
COORDINATOR (ID) ASSIGNMENT-STRATEGY STATE #MEMBERS
10.35.168.252:9092 (4) range Stable 34
bash kafka-consumer-groups.sh --bootstrap-server 10.32.218.112:9092 --describe --group consumer-group-1 --state
COORDINATOR (ID) ASSIGNMENT-STRATEGY STATE #MEMBERS
10.32.218.112:9092 (1) range Stable 14
更新
现在,我可以通过对一个新的使用者组a执行以下步骤来重现相同的问题。使用brokerip www.example.com启动kafka consumer和组cg110.32.218.112:9092
B.使用brokerip验证状态10.32.218.112:9092显示消费者为活动状态
c.使用brokerip 10.35.168.252:9092验证状态显示消费者未处于活动状态
d.使用brokeripwww.example.com启动具有组cg1的Kafka消费者10.35.168.252:9092
即,使用brokerip10.32.218.112:9092验证状态显示消费者为活动
f.使用brokerip验证状态10.35.168.252:9092显示消费者为活动
但两个代理报告的使用者id不同。此外,当我们停止两个使用者时,两个代理报告的上次读取提交偏移量也不同。确认两个使用者被单独处理。
1条答案
按热度按时间dxxyhpgq1#
首先让我们了解消费者组描述命令是如何工作的。
1.当我们使用n个代理ip运行该命令时,调用将随机发送到1个代理(假设为A)。
1.代理选择另一个它认为负载最少的代理(假设是B),并触发FIND_COORDINATOR请求。
1.使用者组(假设为consumerGroupName)的协调器是代理,它是__consumer_offset主题的partitionid =“consumerGroupName”的分区领导者。hashCode()%(__consumer_offset主题中的分区数)
例如,假设在__consumer_offset主题中有50个分区。使用者组“test-consumer-group”的协调器将是作为分区“test-consumer-group”的领导者的代理。hashCode()%50
在kafka 2.8.2之前的版本中,分区的数量是在kafka启动时加载的。从2.8.2开始,这个细节会定期刷新。但是我们应该避免更改__consumer_offset主题中的分区数量。
1.现在代理A知道了协调器代理,它将向协调器发送请求以获取最新的偏移量详细信息。
a.消费者从来没有生活过
B.__consumer_offset的TTL小于使用者组的最后活动时间。
c.__consumer_offset的复制因子为1,并且特定分区存在数据丢失。
在我们的示例中,我们增加了__consumer_offset主题的分区数(这是very wrong thing to do,因为它会使所有使用者群组杂凑不一致。),但也不会重新启动所有代理程式。因此,丛集中的某些代理程式会假设有x个分割区,而其他代理程式则会假设为y,因为在2.8.2版之前,此信息不会更新,除非我们重新启动Kafka经纪人。正因为如此,有时一些经纪人被选为协调人,但他们不知道消费者群体,因此我们得到的React不一致。