我的要求是检查会话中的开始事件和成功事件。当然,我使用会话窗口,但似乎每个键都有重叠的窗口。我在网上搜索过,不知道为什么。
数据格式: myForm(timestamp, roomId, role, sessionId, event)
,例如:
myform(1559128942, 123, kid, 37890, begin) # timestamp equals to 2019-05-29 19:22:22.605
myform(1559128944, 123, kid, 37890, success) # timestamp equals to 2019-05-29 19:22:24.844
myform(1559129977, 456, kid, 38239, begin) # timestamp equals to 2019-05-29 19:39:37
...
会话可能只有一对开始和成功事件,也可能有几对开始和成功事件。
活动可能会迟到,最多允许迟到3分钟。
我的钥匙是 roomId
+ role
+ sessionId
就像“123\u kid\u 37890”, seesionGap
是60年代
// use event time
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val stream = ... // from kafka, steam of myform
val sessionStream = stream
.assignTimestampsAndWatermarks(new MyFormEventWatermarks(0L))
.keyBy(mf => mf.roomId + "_" + mf.role + "_" + mf.sessionId)
.window(EventTimeSessionWindows.withGap(Time.milliseconds(60 * 1000L))
.allowedLateness(Time.minutes(3))
.apply(myFormWindowFunction)
//MyFormEventWatermarks is :
class MyFormEventWatermarks[T <: AbstractForm](dely: Long) extends AssignerWithPeriodicWatermarks[T] {
private var currentMaxTimestamp = Long.MinValue
val maxOutOfOrderness = dely
@transient
var waterMark : Watermark = null
override def getCurrentWatermark: Watermark = {
if (currentMaxTimestamp == Long.MinValue){
waterMark = new Watermark(Long.MinValue)
waterMark
}
else{
waterMark = new Watermark(currentMaxTimestamp - maxOutOfOrderness)
waterMark
}
}
override def extractTimestamp(data: T, previousElementTimestamp: Long): Long = {
val timestamp = data.timestamp
currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp)
timestamp
}
}
//window func is
class myFormWindowFunction extends RichWindowFunction ... {
...
override def apply(key: String, window: TimeWindow, input: Iterable[myForm], out: Collector[List[myForm]]): Unit = {
println("window is " + window.getStart() + "-" + window.getEnd() + "|" + data.tostring)
}
...
}
in方法 apply
的 myFormWindowFunction
,结果 println
比如:
// like this session data:
myform(1559128942, 123, kid, 37890, begin) # timestamp equals to 2019-05-29 19:22:22.605
myform(1559128944, 123, kid, 37890, success) # timestamp equals to 2019-05-29 19:22:24.844
我有一扇Windows 2019-05-29 19:22:22.605- 2019-05-29 19:23:22.605
,数据为 myform(1559128942, 123, kid, 37890, begin)
,然后我得到了第二个窗口 2019-05-29 19:22:22.605 - 2019-05-29 19:23:24.844
数据是 myform(1559128942, 123, kid, 37890, begin), myform(1559128944, 123, kid, 37890, success)
. 它看起来像窗口初始化到(2019-05-29 19:22:22.605,2019-05-29 19:23:22.605)和(2019-05-29 19:22:24.844,2019-05-29 19:23:24.844),以及 onMerge
方法合并但不“删除”窗口(2019-05-29 19:22:22.605,2019-05-29 19:23:22.605)。我查过函数的源代码 EventTimeSessionWindows
还有flink会话窗口的例子,还不知道程序哪里出错了?
2条答案
按热度按时间dsf9zpds1#
在允许的延迟时间间隔内分配给窗口的事件的默认行为是在每个延迟事件添加到窗口时触发该窗口,但是也可以实现一个自定义触发器,在允许的延迟时间到期时触发,而不是在其他触发事件的同时触发。
请注意,对于会话窗口,延迟到达的事件可能会导致延迟合并。
您可能需要考虑水印延迟和允许延迟之间的权衡。因为您的水印延迟为零,所以您可能会有相当多的延迟事件(每次事件流不是完全按时间戳排序时)。例如,如果您使用3分钟作为水印延迟,并将allowed lateness设置为零,那么您将生成相同的最终结果,但是没有延迟触发和延迟合并,但是在每个窗口的初始触发之前有3分钟的延迟。
uqjltbpv2#
我发现了问题,我误解了允许性。当它被使用时,窗口被保存,当窗口+允许延迟时间到达时,窗口将被再次触发。