这是一个来自官方网站2.event time和listen端口的word count示例的修改版本
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//listening to the port
val text = env.socketTextStream("localhost", 9999)
.assignAscendingTimestamps(item => {
val line = item.split(" ")
//simply print timestamp
println(line.apply(1))
line.apply(1).toLong*1000 - 3000
})
执行下面的转换
// the process here
text.map { each_input =>
{
val line = each_input.split(" ")
(line.apply(0),1,line.apply(1))
}}
.process(new SimpleProcessFunc)
.print()
其实逻辑从过程功能上变化不大
val mark = context.timerService().currentWatermark()
val timestamp = context.timestamp()
//print some infomation
println(sdf.format(mark) + "===> watermark ===>" + mark)
println(sdf.format(timestamp) + "===> timestamp in context ===> " + timestamp)
collector.collect(i)
我使用cmd通过socket发送数据,但是从ide控制台看,水印是如何生成的似乎很奇怪,后面似乎没有逻辑
1585977022
03/12/292269055 00:47:04===> watermark ===>-9223372036854775808
04/04/2020 13:10:19===> timestamp in context ===> 1585977019000
2> (epoch,1,1585977022)
1585977034
04/04/2020 13:10:18===> watermark ===>1585977018999
04/04/2020 13:10:31===> timestamp in context ===> 1585977031000
3> (montanin,1,1585977034)
1585977053
04/04/2020 13:10:30===> watermark ===>1585977030999
04/04/2020 13:10:50===> timestamp in context ===> 1585977050000
4> (song,1,1585977053)
1条答案
按热度按时间rslzwgfq1#
以下是水印值背后的逻辑:
初始水印的值为long.min_,即-9223372036854775808。
碰巧的是,水印跟踪在流元素后面,流元素的时间戳被用作创建水印的基础。而水印则是关于某个时间点上的流的完整性的陈述。因此,时间1585977019000的流元素在时间1585977018999的水印之前(因为在水印之后可能还有时间1585977019000的另一个流元素,所以该水印的时间戳为1585977019000是错误的)。
升序时间戳水印生成器是一种周期性水印生成器,默认情况下,它每200毫秒生成一个新的水印,但前提是水印已提前。
在单个输入中访问当前水印时
ProcessFunction
,您将获取该示例接收到的最新水印。在processElement()
方法,则该水印将不会反映水印生成器在处理当前传递给的事件时所学习的内容processElement()
--水印更新将在200毫秒计时器关闭后进行。有关水印的更多信息,请参阅flink training中有关水印的页面。