ack引起的风暴延迟

kqhtkvqz  于 2021-06-21  发布在  Storm
关注(0)|答案(2)|浏览(439)

我用Kafka风暴连接Kafka和风暴。我有3个服务器运行zookeeper,kafka和storm。Kafka中有一个主题“test”,它有9个分区。
在storm拓扑中,kafkaspout executor的数量是9,默认情况下,任务的数量也应该是9。“提取”螺栓是唯一一个连接到Kafka斯波特的螺栓,“原木”喷口。
从用户界面来看,喷口的故障率非常高。但是,bolt中执行的消息数=发出的消息数-bolt中失败的消息数。当失败的消息在开始时为空时,此等式几乎匹配。
根据我的理解,这意味着螺栓确实收到了来自喷口的信息,但确认信号在飞行中暂停。这就是为什么喷口中的背包数量如此之少的原因。
这个问题可以通过增加超时秒数和抛出挂起消息数来解决。但这将导致更多的内存使用,我不能增加到无限。
我在想是否有办法强迫风暴忽略某个喷口/螺栓中的ack,以便它在超时前不会等待该信号。这将显著增加吞吐量,但不能保证消息处理。

ccrfmcuu

ccrfmcuu1#

你的容量数字有点高,这让我相信你真的在最大限度地利用系统资源(cpu,内存)。换句话说,系统似乎有点陷入困境,这可能就是元组超时的原因。你可以试着用 topology.max.spout.pending config属性来限制喷口的飞行中元组数。如果您可以适当地减少这个数量,那么拓扑应该能够有效地处理负载,而不需要元组超时。

7gcisfzg

7gcisfzg2#

如果您将应答器的数量设置为0,则storm将自动应答每个样本。

config.setNumAckers(0);

请注意,ui只测量并显示5%的数据流。除非你

config.setStatsSampleRate(1.0d);

尝试增加螺栓的超时时间并减少 topology.max.spout.pending .
另外,请确保spout的nexttuple()方法是非阻塞和优化的。
我还建议您分析代码,也许您的storm队列已满,您需要增加它们的大小。

config.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE,32);
    config.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE,16384);
    config.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE,16384);

相关问题