我用的是Flume1.7Kafka source
从apache kafka中提取数据到我的 AbstractSink
. 在过去,我可以通过使用删除主题偏移来重新开始主题开头的偏移 ./kafka-consumer-groups.sh --delete
但由于Flume1.7(显然)使用了一个“新的”消费者,因此 ./kafka-consumer-groups.sh --delete
现在给出以下错误消息:
选项[delete]对[new consumer]无效。请注意,无需删除新使用者的组元数据,因为当最后一个成员离开时,它会自动删除
那么,实现所需行为的推荐方法是什么(即我们将从主题开始重新处理数据)?
以下是我的flume配置的一部分:
myagent.sources.my-kafka-source.type = org.apache.flume.source.kafka.KafkaSource
myagent.sources.my-kafka-source.kafka.bootstrap.servers = kafka.example.net:9092
myagent.sources.my-kafka-source.kafka.consumer.group.id = my-gid
myagent.sources.my-kafka-source.kafka.topics = my.topic
myagent.sources.my-kafka-source.kafka.auto.offset.reset = earliest
myagent.sources.my-kafka-source.channels = my_channel
1条答案
按热度按时间0aydgbwb1#
flume不直接支持倒带功能,尽管kafka附带kafkaconsumer#seek,允许您重新使用消息。似乎您必须使用一个新的组id来执行此操作,这需要重新启动flume代理。