我正在从Kafka consumer阅读数据到Storm spout。但是,当我重新启动Storm时,它也会从Kafka读取以前处理过的记录。在重新启动时,我不想处理以前处理过的记录。下面是我的代码:
public class KafkaStormSample {
public static void main(String[] args) throws Exception {
SpoutConfig kafkaSpoutConfig = new SpoutConfig(hosts, topic, "/" + topic, UUID.randomUUID().toString());
kafkaSpoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafka-spout", new KafkaSpout(kafkaSpoutConfig));
builder.setBolt("word-spitter", new SplitBolt()).shuffleGrouping("kafka-spout");
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("KafkaStormSample", config, builder.createTopology());
}
}
2条答案
按热度按时间uidvcgyl1#
沿着静态UUID,您还可以使用
StormSubmitter
提交要在Storm集群上运行的拓扑。更多信息here0s7z1bwu2#
问题是你为SpoutConfig使用的随机UUID。相反,选择一个固定的字符串并每次使用它。
不相关:你不应该用
storm-kafka
来写新代码。用storm-kafka-client
来代替。