如何知道本地窗口属于哪个子任务

6yjfywim  于 2021-06-24  发布在  Flink
关注(0)|答案(1)|浏览(301)

在flink流媒体中是否可能知道本地窗口属于哪个子任务?我想使用 getRuntimeContext().getIndexOfThisSubtask() 中的方法 TriggerPolicy 实现。

uplii1fm

uplii1fm1#

目前无法获得子任务的索引,而该子任务上的窗口运算符 TriggerPolicy 正在运行。
但是,您可以通过放置 map 向每个数据元素分配子任务的当前索引的就地操作。

DataStream<Tuple2<Integer, String>> ds = env.fromElements(
        new Tuple2<Integer, String>(1, "a"),
        new Tuple2<Integer, String>(2, "b"),
        new Tuple2<Integer, String>(1, "c"),
        new Tuple2<Integer, String>(2, "d"));

ds.groupBy(0)
    .map(new RichMapFunction<Tuple2<Integer,String>, Tuple3<Integer, Integer, String>>() {
        @Override
        public Tuple3<Integer, Integer, String> map(Tuple2<Integer, String> integerStringTuple2) throws Exception {
            return new Tuple3<Integer, Integer, String>(
                getRuntimeContext().getIndexOfThisSubtask(),
                integerStringTuple2.f0,
                integerStringTuple2.f1);
        }
    })
    .window(new TestingTriggerPolicy(), new TestingEvictionPolicy())
    .mapWindow(new WindowMapFunction<Tuple3<Integer, Integer, String>, String>() {
        @Override
        public void mapWindow(Iterable<Tuple3<Integer, Integer, String>> iterable, Collector<String> collector) throws Exception {
            StringBuilder builder = new StringBuilder();

            for (Tuple3<Integer, Integer, String> element : iterable) {
                builder.append(element.toString() +"; ");
            }

            collector.collect(builder.toString());
        }
    })

相关问题