创建一个ApacheStorm喷口,每x秒发射一个元组

chhqkbe1  于 2021-06-21  发布在  Storm
关注(0)|答案(2)|浏览(408)

我有一个从mqtt代理接收数据的拓扑,我希望一个喷口的行为如下:
每x秒发出一批元组(或单个元组中的字符串列表)。我如何做到这一点?我读了一些关于风暴三叉戟的书,但它 IBatchSpout 似乎不允许我以特定的时间间隔批量发出元组。
如果没有新数据进来,喷口该怎么办?它不能阻止线程,因为它是storm的主线程,对吗?

drkbr07n

drkbr07n1#

您可以实现自己的mqtt喷口。以mongospout为例。
重要的是 nextTuple 方法。
调用此方法时,storm请求喷口向输出收集器发射元组。这个方法应该是非阻塞的,所以如果喷口没有要发射的元组,这个方法应该返回。nexttuple、ack和fail都是在spout任务中的一个线程中紧密循环调用的。当没有元组发出时,有礼貌的做法是在短时间内(比如一毫秒)进行下一次双倍睡眠,以免浪费太多cpu。
不能一次等待指定的时间,但可以实现 nextTuple 所以它偶尔只发出一个元组。

private static final EMISSION_PERIOD = 2000; // 2 seconds
private long lastEmission;

@Override
public void nextTuple() {
    if (lastEmission == null ||
            lastEmission + EMISSION_PERIOD >= System.currentMillis()) {
        List<Object> tuple = pollMQTT();
        if (tuple != null) {
            this.collector.emit(tuple);
            return;
        }
    }
    Utils.sleep(50);
}

注意,我发现了一个开源的mqtt喷口。它看起来不适合生产,但你可以用它作为一个起点。

lc8prwob

lc8prwob2#

除了christian之外,我还为storm的mqtt客户机找到了这个实现。前面提到的链接仍然没有开发。

相关问题