我使用kafka作为源代码,用storm构建了一个示例拓扑。这是一个我需要解决的问题。每次我删除一个拓扑并再次启动它时,拓扑就会从头开始处理。假设topicx中的消息是由topology处理的,然后我终止了topology。现在当我再次提交拓扑和消息时,仍然有主题x。再次处理。有没有一个解决方案,也许是某种补偿管理来处理这种情况。
wlzqhblo1#
你不应该使用 storm-kafka 对于新代码,它已弃用,因为在kafka中底层客户端api已弃用,并从2.0.0开始删除。相反,使用 storm-kafka-client .与 storm-kafka-client 您需要设置组id和第一个轮询偏移策略。
storm-kafka
storm-kafka-client
KafkaSpoutConfig.builder(bootstrapServers, "your-topic") .setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup") .setFirstPollOffsetStrategy(UNCOMMITTED_EARLIEST) .build();
以上将使你的喷口开始在最早的偏移量你第一次启动它,然后它会拿起它停下来,如果你重新启动它。kafka使用组id在喷口重新启动时识别喷口,这样就可以取回存储的偏移量检查点。其他偏移策略的行为会有所不同,您可以检查javadoc中的firstpolloffsetstrategy enum。喷口将定期检查它到达的距离,配置中也有一个设置来控制这一点。检查点由 setProcessingGuarantee 设置,并且可以设置为至少有一次(仅检查点确认的偏移量)、最多一次(检查点在喷口发出消息之前)和“任何时间”(定期检查点,忽略确认)。看一下storm中包含的一个示例拓扑https://github.com/apache/storm/blob/dc56e32f3dcdd9396a827a85029d60ed97474786/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/spout/kafkaspouttopologymainnamedtopics.java#l93.
setProcessingGuarantee
lhcgjxsq2#
在创建您的喷口配置时,请确保它有一个固定的喷口id,通过它可以在重新启动后标识自己。来自官方风暴现场:重要提示:重新部署拓扑时,请确保未修改spoutconfig.zkroot和spoutconfig.id的设置,否则spout将无法从zookeeper读取其以前的使用者状态信息(即偏移量),这可能会导致意外行为和/或数据丢失,具体取决于您的用例。
2条答案
按热度按时间wlzqhblo1#
你不应该使用
storm-kafka
对于新代码,它已弃用,因为在kafka中底层客户端api已弃用,并从2.0.0开始删除。相反,使用storm-kafka-client
.与
storm-kafka-client
您需要设置组id和第一个轮询偏移策略。以上将使你的喷口开始在最早的偏移量你第一次启动它,然后它会拿起它停下来,如果你重新启动它。kafka使用组id在喷口重新启动时识别喷口,这样就可以取回存储的偏移量检查点。其他偏移策略的行为会有所不同,您可以检查javadoc中的firstpolloffsetstrategy enum。
喷口将定期检查它到达的距离,配置中也有一个设置来控制这一点。检查点由
setProcessingGuarantee
设置,并且可以设置为至少有一次(仅检查点确认的偏移量)、最多一次(检查点在喷口发出消息之前)和“任何时间”(定期检查点,忽略确认)。看一下storm中包含的一个示例拓扑https://github.com/apache/storm/blob/dc56e32f3dcdd9396a827a85029d60ed97474786/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/spout/kafkaspouttopologymainnamedtopics.java#l93.
lhcgjxsq2#
在创建您的喷口配置时,请确保它有一个固定的喷口id,通过它可以在重新启动后标识自己。
来自官方风暴现场:
重要提示:重新部署拓扑时,请确保未修改spoutconfig.zkroot和spoutconfig.id的设置,否则spout将无法从zookeeper读取其以前的使用者状态信息(即偏移量),这可能会导致意外行为和/或数据丢失,具体取决于您的用例。