我集成了ApacheStorm1.0.3和ApacheKafka2.11-0.10.1.0。storm正确地阅读了Kafka主题中的一到两个主题,但是当第一个bolt确认元组时,这个确认并没有显示在storm ui中。有什么问题?
其他问题:当storm从kafka主题中读取十或十九条消息时,在该点storm ui显示20条确认消息,如果读取其他组19条消息确认添加20条以上。我不知道´我不明白为什么风暴用户界面显示喷口和螺栓的20分之20的acked。有人能解释一下storm ui控制台中acked和fail寄存器的逻辑吗?。
我的拓扑结构的配置是:
final TopologyBuilder myTopology = new TopologyBuilder();
KafkaConfiguration kconfig = new KafkaConfiguration();
SpoutConfig spout = kconfig.getKafkaConfiguration( args[0], args[1], args[2], args[3]);
myTopology.setSpout("spoutMvClient", new KafkaSpout(spout), 5);
myTopology.setBolt("boltTransformToObject", new TransformBolt(),7).globalGrouping("spoutMvClient");
myTopology.setBolt("boltMVClient", new MvClientBolt(), 6).fieldsGrouping("boltTransformToObject",new Fields("objectTarget"));
Config conf = new Config();
conf.setMaxSpoutPending(5000);
try {
StormSubmitter.submitTopology( "topologyOne", conf, myTopology.createTopology());
} catch (AlreadyAliveException e) {
e.printStackTrace();
}
我的第一个转变是:
public void execute(Tuple input) {
try {
LOG.info(input.getString(0));
Transform transform = new Transform();
OpenTarget openTarget = transform.getObjetGenericFromFileXml(input.getString(0));
collector.emit(input, new Values(openTarget));
collector.ack(input);
} catch (Exception e) {
LOG.error(e.getMessage());
collector.fail(input);
}
}
1条答案
按热度按时间8zzbczxx1#
在调查之后,我了解了storm是如何通过storm ui实现元组和显示的。有一个默认配置,允许在storm ui中显示确认的号码:
此配置仅测量并显示5%的数据流。如果我们需要显示特定喷口或螺栓确认或失败的元组数,则必须将此配置更改为:
就像:
然后storm ui开始计数并正确显示拓扑上发生的已确认数:
最后,这里还有一个和我差不多的问题:由ack引起的风暴延迟