我想我有一个相当非标准的用例。我想使用 filter
功能:
val dataStream:DataStream[MyEvent] = ...
val s1 = dataStream.filter(...).map(...)
val s2 = dataStream.filter(...).map(...)
我还有一个时间戳提取器(传入事件将以xml形式附加一个时间戳):
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
...
dataStream.assignTimestampsAndWatermarks(new MyTimestampExtractor)
...
class MyTimestampExtractor extends AssignerWithPunctuatedWatermarks[Elem]
{
override def checkAndGetNextWatermark(lastElement:Elem, extractedTimestamp:Long):Watermark = new Watermark(extractedTimestamp)
override def extractTimestamp(element:Elem, previousElementTimestamp:Long):Long = XmlOperations.getDateTime(element, "@timestamp").getMillis
}
我之所以选择这种方法,是因为它不是简单地做一个流( val s = dataStream.filter(...).map(...).filter(...).map(...)
)因为我想建立一个可以分割/组合任意流的网络(例如s1+s2->c1,s1+s3->c2,c2+s4->c3,…)
现在,当通过上面的示例发送事件时,事件e1可能同时在s1和s2中结束。这意味着,在我的理解中,同一事件e1作为第一个示例放入s1(e1a),同时作为第二个示例放入s2(e1b)。
所以我现在要做的是把e1a和e1b重新组合成一个组合的e1,它类似于e1,s1和s2的变换。
我试过:
val c1 = s1.join(s2)
.where(_.key).equalTo(_.key)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.apply((e1a, e2b) => { printf("Got e1a and e1b"); e1a })
然而,似乎这些事件从未达到apply函数,我也不知道为什么。
我的例子有什么问题?我的方法/想法会像这样的流网络工作吗?
1条答案
按热度按时间5vf7fwbs1#
你安排好有水印了吗?在处理事件时间时,只有当水印到达时才会触发窗口,该水印将事件时钟提前到窗口的末尾。您可以使用时间戳提取器/水印生成器来实现这一点;有关详细信息,请参阅文档中的示例。
如果其中一个流有时是空闲的,这也会导致问题,因为空闲流上缺少水印会阻止它所连接的任何流的水印。
取决于您正试图做什么,您可能会发现使用协处理函数比使用时间窗口连接更容易。以flink培训站点的有状态充实和过期状态练习为例。