//assume following logic
val source = arrayOf(1,2,3,4,5,6,7,8,9,10,11,12) // total 12 elements
val env = StreamExecutionEnvironment.createLocalEnvironment(1);
val input = env.fromCollection(source)
.countWindowAll(5)
.aggregate(...) // pack them to List<Int> for bulk upload to DB
.addSink(...) // sends bulk
当我执行它-只有前10个处理,但其余2个元素被扔掉-Flink关闭没有处理他们。
我唯一要避免的是-虽然我完全控制源数据,但我可以将一些已知的可忽略的\u值推送到源集合以适应窗口大小,然后在sink中忽略它们。。。但我觉得在Flink哪里有更专业的方法。
1条答案
按热度按时间jfgube3f1#
有一个12的有限流和一个每5个元素触发一次的窗口。所以第一个窗口得到5个元素,然后触发,接下来的5个被接收并触发,但是最后的2个出现了,作业知道不会再有更多的元素出现了。因此,由于窗口中没有5个元素,因此触发器不会触发,因此不会对它们执行任何操作。