基于时间戳同步apache flink流

46scxncf  于 2021-06-24  发布在  Flink
关注(0)|答案(1)|浏览(379)

我有几个用例需要基于时间戳同步多个流。
下面是一个示例,我想同步交易栏和报价栏,例如,我从原始交易和报价中生成,我聚合了:

val tradeBars: DataStream[TradeBar] = trades
  .assignAscendingTimestamps(_.epochMillis)
  .keyBy("key")
  .window(TumblingEventTimeWindows.of(Time.minutes(5)))
  .process(new TimeTradeBar(new DownTick()))

val quotesWithFlow = quotes
  .assignAscendingTimestamps(_.epochMillis)
  .keyBy("key")
  .countWindow(2, 1)
  .reduce((previousQuote, quote) => Quote.localOrderFlow(previousQuote, quote))
  .assignAscendingTimestamps(_.epochMillis)
  .keyBy("key")

val quoteBars: DataStream[QuoteBar] = quotesWithFlow
  .assignAscendingTimestamps(_.epochMillis)
  .keyBy("key")
  .window(TumblingEventTimeWindows.of(Time.minutes(5)))
  .process(new QuoteBars.TimeQuoteBar())

val joined: JoinedStreams[TradeBar, QuoteBar]#Where[LocalDateTime]#EqualTo = tradeBars
  .join(quoteBars)
  .where(_.start).equalTo(_.start)
  // need a window here, just want to sync on same time window

我试着使用flink的window join函数,但显然这需要一个window函数,然后我就可以使用apply方法了。我只想在同一时间窗口同步流。我怀疑这不是join方法的意图。
我有一个使用flink stream connect方法的工作实现。我将其应用于交易栏流和原始报价流,但这需要我自己编写一个相当混乱的协处理函数

CoProcessTradeBarsAndQuotes() extends CoProcessFunction[TradeBar, Quote, (TradeBar, QuoteBar)]
{}

这是相当混乱的,因为我必须跟踪缓冲区中的引号,并小心地从process1和process2函数执行聚合。我想一定有更简单的方法,我就是看不出来。感谢您的帮助和建议。

t5zmwmid

t5zmwmid1#

你没有提到你用来决定加入哪两个股票(可能很多)的逻辑,但一般来说,我会通过从第一个窗口函数(open,high,low,close,stock)生成一个输出记录来解决这个问题,其中一个额外的字段表示窗口的时间(截断为小时),然后按该时间字段输入,并执行另一个窗口操作,以创建所需股票的联接。

相关问题