你知道如何使用命令行在kafka中使用消息时设置组名吗。
我尝试了以下命令:
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic nil_RF2_P2 --from-beginning --config group.id=test1
'config' is not a recognized option
目标是使用以下命令查找已使用消息的偏移量:
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost:2181 --group test1
有人能在这方面帮忙吗!!
提前谢谢!!
5条答案
按热度按时间watbbzwu1#
您可以像这样使用--group选项(使用kafka 2.0.0测试):
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --group test-consumer --topic test --from-beginning
jvidinwx2#
得到了从命令提示符更改组名的答案!!
步骤:
创建新的
consumer.properties
文件,比如说consumer1.properties
.改变
group.id=<Give a new group name>
在consumer1.properties
.bin/kafka-console-consumer.sh --new-consumer --bootstrap-server localhost:9092 --topic topicname --from-beginning --consumer.config config/consumer1.properties --delete-consumer-offsets
gywdnpxw3#
最简单的解决方案是:
bin/kafka-console-consumer.sh--缩放器localhost:2181 --topic nil\u rf2\u p2—从一开始—consumer property group.id=test1
在指定标志的情况下——从一开始就要记住使用者组在过去不应该消费任何记录,否则您的使用者将从指定组最早的未消费记录开始消费(而不是从实际开始,因为您可能会错误地假设)。
wkyowqbh4#
如果您使用的是bash,那么就可以使用它的进程替换功能。
bwitn5fc5#
如果要更改组id而不丢失记录的偏移量,则必须手动获取当前group.id的偏移量,并将其设置为具有新id的新运行使用者。如果没有任何控件在使用者示例中获取偏移量,则可以运行此命令。
然后可以从特定偏移量中查找数据。注意你应该在电话调查之后再打电话، 分配命令无效。您还可以在github中看到我的代码示例
这里举个例子