这是我的密码。
SplitStream<MonitoringEvent> splitStream = inputStream.split(new OutputSelector<MonitoringEvent>() {
@Override
public Iterable<String> select(MonitoringEvent me) {
List<String> ml = new ArrayList<String>();
ml.add(me.getEventType());
return ml;
}
我有一连串的监控事件以随机顺序出现温度:80,压力:70,湿度:80,temp:30...
使用上面的代码,我将拆分流,按事件类型,即temperaturestream、pressuresestream。
问题是,如果我知道eventtype,我可以从splitstream中选择它,比如
splitStream.select('temperatureStream')
但是eventtype是动态的,不是预定义的。
我将如何为这个动态流应用cep。cep会是这样的,如果
temperate is > 90 for past 10 minutes ...
pressure is > 90 for past 10 minutes ...
1条答案
按热度按时间xj3cbfub1#
如果我错了,请纠正我,但我认为不可能对selectdueflink的parallism进行动态查找。您的程序被翻译成flinks任务管理器的并行指令,作业管理器协调这些操作。如果没有对抽象语法树的全面了解,就根本无法应用parallism。。。也许你可以找到一些共同的属性,所有的消息共享和不同的地方