为给定主题的新使用者组设置初始“当前偏移量”和“延迟”

webghufk  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(355)

我正在开发一个产品,它可以根据用户使用产品的方式添加/删除消费者组。 enable.auto.commit 在我们的产品中是关闭的,相反,我们每次收到数据后都提交偏移量。
我们最近实现了一个将暂停/恢复产品的服务。kafka库(在nodejs中)还没有可用的暂停/恢复功能,所以我最终取消订阅/订阅了主题,而不是基于消费者群体,这似乎像我们预期的那样工作。
唯一的问题发生在添加新的消费群体时。首先,让我解释一下我看到的行为:
以下是消费者“group1”信息。。

$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group philz-topic-group1

TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
philz-topic                    1          33              33              0          rdkafka-3ac4d56e-e94b-4365-9af7-04e485502b5d      /10.233.113.109                rdkafka
philz-topic                    4          34              34              0          rdkafka-d642805c-f5ea-4450-9cb0-3272fcbbffc9      /10.233.88.251                 rdkafka
philz-topic                    0          23              23              0          rdkafka-12cfca8b-fd61-4a68-bc5f-1946c8ef4eb1      /10.233.120.55                 rdkafka
philz-topic                    2          26              26              0          rdkafka-7561ca2a-9894-4a3d-83fe-d379bbe64fdf      /10.233.126.40                 rdkafka
philz-topic                    3          20              20              0          rdkafka-cd9d5ed6-7daa-4b75-8f39-6704c8d887ed      /10.233.119.133                rdkafka

这是消费者的“group2”信息。。刚刚添加了消费者“group2”,并完成了一个操作。因此,单个操作的当前偏移量和延迟已更新。

$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group philz-topic-group2

TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
philz-topic                    3          -               20              -          rdkafka-b56306e1-b4b7-43fe-a604-ab7c12f70e9f      /10.233.119.133                rdkafka
philz-topic                    1          -               33              -          rdkafka-76c9a4d2-268b-4ebb-94a8-f1230c9bbfea      /10.233.113.109                rdkafka
philz-topic                    4          34              34              0          rdkafka-d412e574-8241-48c6-af26-c50be44eb51d      /10.233.126.40                 rdkafka
philz-topic                    0          -               23              -          rdkafka-33179a7d-cb9f-453a-83c6-e7e4780372b6      /10.233.88.251                 rdkafka
philz-topic                    2          -               26              -          rdkafka-77506e87-b666-4c92-82df-82071e2ff801      /10.233.120.55                 rdkafka

如果添加了新的使用者组,但未完成任何操作,则上述命令不会显示有关使用者组的任何信息。
我目前面临的问题是,当一个暂停/恢复操作发生时,消费者组的所有分区都没有更新的当前偏移量和延迟,当取消订阅/暂停并完成一个操作时,分区现在应该有1的延迟。但是,如果一个新的使用者组对于给定的分区没有任何先前的当前偏移量和延迟,那么该信息现在将被跳过,并且使用者组永远看不到。
我的问题是,在创建新的使用者组时,是否可以更新组的当前偏移量以匹配所有可用分区的日志结束偏移量?
我对Kafka不是很熟悉,所以这里的任何行为解释都是值得赞赏的。
我猜是因为我们自己 enable.auto.commit 当一个操作发生时,我们可以看到新使用者组的一些信息,但只能看到一个分区(刚刚接收数据的分区)被显示并用当前偏移量更新。
谢谢!
编辑:
另外,在我的示例中,每个使用者组有5个使用者和5个分区,因此每个分区应该有一个使用者

yyhrrdl8

yyhrrdl81#

感谢cricket\u007为Kafka消费者提供了必要的选择
消费者选择 auto.offset.reset 允许在示例化时自动设置使用者偏移量。通过将此选项的值设置为“earliest”,它将每个分区的当前偏移量设置为 LOG-END-OFFSET .
要使用节点库设置此选项,只需执行以下操作:

const consumer = new Kafka.KafkaConsumer(config, {
    'auto.offset.reset': 'earliest'
});

哪里 config 是用于使用者的键/值对配置,第二个参数是用于创建默认主题配置的键/值对配置。
配置是在使用者上设置的主题级配置,如下所述:https://github.com/edenhill/librdkafka/blob/0.11.1.x/configuration.md

相关问题