Apache风暴Kafka喷口滞后问题

rbl8hiat  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(644)

我正在使用Storm1.1.2和Kafka0.11构建一个JavaSpring应用程序,该应用程序将在docker容器中启动。
我的拓扑结构中的所有东西都按计划工作,但是在Kafka的高负载下,Kafka延迟会随着时间越来越大。
我的kafkaspoutconfig:

KafkaSpoutConfig<String,String> spoutConf = 
     KafkaSpoutConfig.builder("kafkaContainerName:9092", "myTopic")
     .setProp(ConsumerConfig.GROUP_ID_CONFIG, "myGroup")
     .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MyObjectDeserializer.class)
     .build()

我的拓扑结构如下

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("stormKafkaSpout", new KafkaSpout<String,String>(spoutConf), 25);

builder.setBolt("routerBolt", new RouterBolt(),25).shuffleGrouping("stormKafkaSpout");

Config conf = new Config();
conf.setNumWorkers(10);
conf.put(Config.STORM_ZOOKEEPER_SERVERS, ImmutableList.of("zookeeper"));
conf.put(Config.STORM_ZOOKEEPER_PORT, 2181);

conf.put(Config.NIMBUS_SEEDS, ImmutableList.of("nimbus"));
conf.put(Config.NIMBUS_THRIFT_PORT, 6627);

System.setProperty("storm.jar", "/opt/storm.jar");

StormSubmitter.submitTopology("topologyId", conf, builder.createTopology());

routerbolt(扩展baserichbolt)执行一个非常简单的switch语句,然后使用本地kafkaproducer对象将新消息发送到另一个主题。就像我说的,所有的东西都会编译,拓扑也会像预期的那样运行,但是在高负载(3000条消息/秒)下,kafka延迟会堆积起来,相当于拓扑的低吞吐量。
我试过用

conf.setNumAckers(0);

conf.put(Config.TOPOLGY_ACKER_EXECUTORS, 0);

但我想这不是一个问题。
我在storm ui上看到,routerbolt在高负载下的执行延迟为1.2ms,进程延迟为.03ms,这让我觉得喷嘴是瓶颈,并行提示是25,因为“mytopic”有25个分区。谢谢!

mctunoxg

mctunoxg1#

你可能会受到https://issues.apache.org/jira/browse/storm-3102,这使得喷口在每一次发射时都会发出相当昂贵的声音。请尝试升级到某个固定版本。
编辑:修复程序实际上还没有发布。您可能仍然希望通过使用例如。https://github.com/apache/storm/tree/1.1.x-branch 构建1.1.4快照。

相关问题