执行两层聚合的最佳方法是什么?

mutmk8jj  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(277)

我正在尝试使用Flink1.3.2进行第二层聚合,但随着时间的推移,我的结果似乎没有达到预期的效果。最初,我得到了预期的结果。
我在这里采取的方法有什么根本的错误吗?
我还没有找到其他人执行这种链式操作的好例子。

val myStream = sourceStream
  .keyBy( 0 )      
  .timeWindow( Time.minutes(30) )
  .reduce( (r1: myRow, r2: myRow) => {  r1 + r2  },
           (key: Any, window: TimeWindow, iterable: Iterable[myRow], out: Collector[myRow]) => { out.collect(iterable.iterator.next.setWindowStart(window.getStart)) }   )
  .map(tier2Row.fromMyRow(_))
  .keyBy( 0 )        
  .timeWindow( Time.minutes(10) )
  .reduce( _ + _ )
  .addSink(new MyTier2RowSink)
vbkedwbf

vbkedwbf1#

我最初使用基于时间的数据处理时间模式,结果很奇怪。我切换到摄取时间模式,分层聚合现在以预期的方式进行处理,除了设置模式外,对代码没有任何更改。

相关问题