如何在主题开始时重新启动?

xzlaal3s  于 2021-06-04  发布在  Flume
关注(0)|答案(1)|浏览(433)

我用的是Flume1.7Kafka source 从apache kafka中提取数据到我的 AbstractSink . 在过去,我可以通过使用删除主题偏移来重新开始主题开头的偏移 ./kafka-consumer-groups.sh --delete 但由于Flume1.7(显然)使用了一个“新的”消费者,因此 ./kafka-consumer-groups.sh --delete 现在给出以下错误消息:
选项[delete]对[new consumer]无效。请注意,无需删除新使用者的组元数据,因为当最后一个成员离开时,它会自动删除
那么,实现所需行为的推荐方法是什么(即我们将从主题开始重新处理数据)?
以下是我的flume配置的一部分:

myagent.sources.my-kafka-source.type = org.apache.flume.source.kafka.KafkaSource
myagent.sources.my-kafka-source.kafka.bootstrap.servers = kafka.example.net:9092
myagent.sources.my-kafka-source.kafka.consumer.group.id = my-gid
myagent.sources.my-kafka-source.kafka.topics = my.topic
myagent.sources.my-kafka-source.kafka.auto.offset.reset = earliest
myagent.sources.my-kafka-source.channels = my_channel
0aydgbwb

0aydgbwb1#

flume不直接支持倒带功能,尽管kafka附带kafkaconsumer#seek,允许您重新使用消息。似乎您必须使用一个新的组id来执行此操作,这需要重新启动flume代理。

相关问题