我正在尝试使用Flink1.3.2进行第二层聚合,但随着时间的推移,我的结果似乎没有达到预期的效果。最初,我得到了预期的结果。
我在这里采取的方法有什么根本的错误吗?
我还没有找到其他人执行这种链式操作的好例子。
val myStream = sourceStream
.keyBy( 0 )
.timeWindow( Time.minutes(30) )
.reduce( (r1: myRow, r2: myRow) => { r1 + r2 },
(key: Any, window: TimeWindow, iterable: Iterable[myRow], out: Collector[myRow]) => { out.collect(iterable.iterator.next.setWindowStart(window.getStart)) } )
.map(tier2Row.fromMyRow(_))
.keyBy( 0 )
.timeWindow( Time.minutes(10) )
.reduce( _ + _ )
.addSink(new MyTier2RowSink)
1条答案
按热度按时间vbkedwbf1#
我最初使用基于时间的数据处理时间模式,结果很奇怪。我切换到摄取时间模式,分层聚合现在以预期的方式进行处理,除了设置模式外,对代码没有任何更改。