我有一个从mqtt代理接收数据的拓扑,我希望一个喷口的行为如下:每x秒发出一批元组(或单个元组中的字符串列表)。我如何做到这一点?我读了一些关于风暴三叉戟的书,但它 IBatchSpout 似乎不允许我以特定的时间间隔批量发出元组。如果没有新数据进来,喷口该怎么办?它不能阻止线程,因为它是storm的主线程,对吗?
IBatchSpout
drkbr07n1#
您可以实现自己的mqtt喷口。以mongospout为例。重要的是 nextTuple 方法。调用此方法时,storm请求喷口向输出收集器发射元组。这个方法应该是非阻塞的,所以如果喷口没有要发射的元组,这个方法应该返回。nexttuple、ack和fail都是在spout任务中的一个线程中紧密循环调用的。当没有元组发出时,有礼貌的做法是在短时间内(比如一毫秒)进行下一次双倍睡眠,以免浪费太多cpu。不能一次等待指定的时间,但可以实现 nextTuple 所以它偶尔只发出一个元组。
nextTuple
private static final EMISSION_PERIOD = 2000; // 2 seconds private long lastEmission; @Override public void nextTuple() { if (lastEmission == null || lastEmission + EMISSION_PERIOD >= System.currentMillis()) { List<Object> tuple = pollMQTT(); if (tuple != null) { this.collector.emit(tuple); return; } } Utils.sleep(50); }
注意,我发现了一个开源的mqtt喷口。它看起来不适合生产,但你可以用它作为一个起点。
lc8prwob2#
除了christian之外,我还为storm的mqtt客户机找到了这个实现。前面提到的链接仍然没有开发。
2条答案
按热度按时间drkbr07n1#
您可以实现自己的mqtt喷口。以mongospout为例。
重要的是
nextTuple
方法。调用此方法时,storm请求喷口向输出收集器发射元组。这个方法应该是非阻塞的,所以如果喷口没有要发射的元组,这个方法应该返回。nexttuple、ack和fail都是在spout任务中的一个线程中紧密循环调用的。当没有元组发出时,有礼貌的做法是在短时间内(比如一毫秒)进行下一次双倍睡眠,以免浪费太多cpu。
不能一次等待指定的时间,但可以实现
nextTuple
所以它偶尔只发出一个元组。注意,我发现了一个开源的mqtt喷口。它看起来不适合生产,但你可以用它作为一个起点。
lc8prwob2#
除了christian之外,我还为storm的mqtt客户机找到了这个实现。前面提到的链接仍然没有开发。