Apache风暴与Kafka补偿管理

mwngjboj  于 2021-06-06  发布在  Kafka
关注(0)|答案(2)|浏览(370)

我使用kafka作为源代码,用storm构建了一个示例拓扑。这是一个我需要解决的问题。
每次我删除一个拓扑并再次启动它时,拓扑就会从头开始处理。
假设topicx中的消息是由topology处理的,然后我终止了topology。
现在当我再次提交拓扑和消息时,仍然有主题x。再次处理。
有没有一个解决方案,也许是某种补偿管理来处理这种情况。

wlzqhblo

wlzqhblo1#

你不应该使用 storm-kafka 对于新代码,它已弃用,因为在kafka中底层客户端api已弃用,并从2.0.0开始删除。相反,使用 storm-kafka-client .
storm-kafka-client 您需要设置组id和第一个轮询偏移策略。

KafkaSpoutConfig.builder(bootstrapServers, "your-topic")
            .setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup")
            .setFirstPollOffsetStrategy(UNCOMMITTED_EARLIEST)
            .build();

以上将使你的喷口开始在最早的偏移量你第一次启动它,然后它会拿起它停下来,如果你重新启动它。kafka使用组id在喷口重新启动时识别喷口,这样就可以取回存储的偏移量检查点。其他偏移策略的行为会有所不同,您可以检查javadoc中的firstpolloffsetstrategy enum。
喷口将定期检查它到达的距离,配置中也有一个设置来控制这一点。检查点由 setProcessingGuarantee 设置,并且可以设置为至少有一次(仅检查点确认的偏移量)、最多一次(检查点在喷口发出消息之前)和“任何时间”(定期检查点,忽略确认)。
看一下storm中包含的一个示例拓扑https://github.com/apache/storm/blob/dc56e32f3dcdd9396a827a85029d60ed97474786/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/spout/kafkaspouttopologymainnamedtopics.java#l93.

lhcgjxsq

lhcgjxsq2#

在创建您的喷口配置时,请确保它有一个固定的喷口id,通过它可以在重新启动后标识自己。
来自官方风暴现场:
重要提示:重新部署拓扑时,请确保未修改spoutconfig.zkroot和spoutconfig.id的设置,否则spout将无法从zookeeper读取其以前的使用者状态信息(即偏移量),这可能会导致意外行为和/或数据丢失,具体取决于您的用例。

相关问题