在flink流媒体中是否可能知道本地窗口属于哪个子任务?我想使用 getRuntimeContext().getIndexOfThisSubtask() 中的方法 TriggerPolicy 实现。
getRuntimeContext().getIndexOfThisSubtask()
TriggerPolicy
uplii1fm1#
目前无法获得子任务的索引,而该子任务上的窗口运算符 TriggerPolicy 正在运行。但是,您可以通过放置 map 向每个数据元素分配子任务的当前索引的就地操作。
map
DataStream<Tuple2<Integer, String>> ds = env.fromElements( new Tuple2<Integer, String>(1, "a"), new Tuple2<Integer, String>(2, "b"), new Tuple2<Integer, String>(1, "c"), new Tuple2<Integer, String>(2, "d")); ds.groupBy(0) .map(new RichMapFunction<Tuple2<Integer,String>, Tuple3<Integer, Integer, String>>() { @Override public Tuple3<Integer, Integer, String> map(Tuple2<Integer, String> integerStringTuple2) throws Exception { return new Tuple3<Integer, Integer, String>( getRuntimeContext().getIndexOfThisSubtask(), integerStringTuple2.f0, integerStringTuple2.f1); } }) .window(new TestingTriggerPolicy(), new TestingEvictionPolicy()) .mapWindow(new WindowMapFunction<Tuple3<Integer, Integer, String>, String>() { @Override public void mapWindow(Iterable<Tuple3<Integer, Integer, String>> iterable, Collector<String> collector) throws Exception { StringBuilder builder = new StringBuilder(); for (Tuple3<Integer, Integer, String> element : iterable) { builder.append(element.toString() +"; "); } collector.collect(builder.toString()); } })
1条答案
按热度按时间uplii1fm1#
目前无法获得子任务的索引,而该子任务上的窗口运算符
TriggerPolicy
正在运行。但是,您可以通过放置
map
向每个数据元素分配子任务的当前索引的就地操作。