在将Storm1.1.0用于我的拓扑结构时,我遇到了一个问题,即storm会重新安排时间,或者让螺栓和喷口崩溃,因为拓扑结构的螺栓具有很高的延迟。
现在我创建了一个latencytest拓扑来测试和处理这个问题。
我有一个喷口,它发射随机值:
public void nextTuple() {
outputCollector.emit(new Values(Math.random()));
}
我有一个螺栓,它接收这些值并在特定时间内睡眠。
public void execute(Tuple tuple) {
double input = tuple.getDouble(0);
try {
Thread.sleep(this.latencyMS);
} catch (InterruptedException e) {
e.printStackTrace();
}
outputCollector.ack(tuple);
}
所以,如果我把latencyms设为10,我可以看到,风暴在3分钟内运行“很好”(在bolt中只有2000个acked元组)。然后,bolt延迟从10ms上升到60-100ms,storm开始“重新分配”(登录nimbus)执行者。然后ui中的所有统计信息都将变为0。
图1:没有崩溃,2000个元组有很高的延迟
图2:ui中没有信息
因为我正在处理真实拓扑中的文件,所以重新打开这些文件是不可接受的。
我在storm.yaml中使用了一些超时值和“config.setmaxpoutpending(200);”但似乎没有任何效果。我使用一个3节点的zookeeper集群和一个5节点的storm集群进行这个测试。
你有什么办法来解决或理解这个问题吗?我需要风暴继续,即使延迟很高。
1条答案
按热度按时间ttcibm8c1#
maxpoutpending不是超时配置,您应该配置的是
topology.message.timeout
.