我正在使用Storm1.1.2和Kafka0.11构建一个JavaSpring应用程序,该应用程序将在docker容器中启动。
我的拓扑结构中的所有东西都按计划工作,但是在Kafka的高负载下,Kafka延迟会随着时间越来越大。
我的kafkaspoutconfig:
KafkaSpoutConfig<String,String> spoutConf =
KafkaSpoutConfig.builder("kafkaContainerName:9092", "myTopic")
.setProp(ConsumerConfig.GROUP_ID_CONFIG, "myGroup")
.setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MyObjectDeserializer.class)
.build()
我的拓扑结构如下
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("stormKafkaSpout", new KafkaSpout<String,String>(spoutConf), 25);
builder.setBolt("routerBolt", new RouterBolt(),25).shuffleGrouping("stormKafkaSpout");
Config conf = new Config();
conf.setNumWorkers(10);
conf.put(Config.STORM_ZOOKEEPER_SERVERS, ImmutableList.of("zookeeper"));
conf.put(Config.STORM_ZOOKEEPER_PORT, 2181);
conf.put(Config.NIMBUS_SEEDS, ImmutableList.of("nimbus"));
conf.put(Config.NIMBUS_THRIFT_PORT, 6627);
System.setProperty("storm.jar", "/opt/storm.jar");
StormSubmitter.submitTopology("topologyId", conf, builder.createTopology());
routerbolt(扩展baserichbolt)执行一个非常简单的switch语句,然后使用本地kafkaproducer对象将新消息发送到另一个主题。就像我说的,所有的东西都会编译,拓扑也会像预期的那样运行,但是在高负载(3000条消息/秒)下,kafka延迟会堆积起来,相当于拓扑的低吞吐量。
我试过用
conf.setNumAckers(0);
和
conf.put(Config.TOPOLGY_ACKER_EXECUTORS, 0);
但我想这不是一个问题。
我在storm ui上看到,routerbolt在高负载下的执行延迟为1.2ms,进程延迟为.03ms,这让我觉得喷嘴是瓶颈,并行提示是25,因为“mytopic”有25个分区。谢谢!
1条答案
按热度按时间mctunoxg1#
你可能会受到https://issues.apache.org/jira/browse/storm-3102,这使得喷口在每一次发射时都会发出相当昂贵的声音。请尝试升级到某个固定版本。
编辑:修复程序实际上还没有发布。您可能仍然希望通过使用例如。https://github.com/apache/storm/tree/1.1.x-branch 构建1.1.4快照。