动态输出键的splitstream(select)

67up9zun  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(530)

这是我的密码。

SplitStream<MonitoringEvent> splitStream =  inputStream.split(new OutputSelector<MonitoringEvent>() {

    @Override
    public Iterable<String> select(MonitoringEvent me) {

        List<String> ml = new ArrayList<String>();              
        ml.add(me.getEventType());                              
        return ml;
}

我有一连串的监控事件以随机顺序出现温度:80,压力:70,湿度:80,temp:30...
使用上面的代码,我将拆分流,按事件类型,即temperaturestream、pressuresestream。
问题是,如果我知道eventtype,我可以从splitstream中选择它,比如

splitStream.select('temperatureStream')

但是eventtype是动态的,不是预定义的。
我将如何为这个动态流应用cep。cep会是这样的,如果

temperate is > 90 for past 10 minutes ...

pressure is > 90 for past 10 minutes ...
xj3cbfub

xj3cbfub1#

如果我错了,请纠正我,但我认为不可能对selectdueflink的parallism进行动态查找。您的程序被翻译成flinks任务管理器的并行指令,作业管理器协调这些操作。如果没有对抽象语法树的全面了解,就根本无法应用parallism。。。也许你可以找到一些共同的属性,所有的消息共享和不同的地方

相关问题