我使用kafka consumer 0.9消费数据,在对有效事件进行一些初始筛选并Map到pojo之后,我设置ascendingtimestampextractor。
使用joda时间库,我将返回自epoch以来的毫秒数,如下所示: return DateTime.parse(evnt.timestamp).getMillis();
这似乎会导致 WARN org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor - Timestamp monotony violated: 1478048406982 < 1478051502295
当然顺序是不同的,但我不明白的是如何确保这些是有序的?我应该根据这个时间戳来订购这些,然后执行任何keyby或timewindow吗?
我真的没有找到太多的信息,所以任何帮助将不胜感激。
暂无答案!
目前还没有任何答案,快来回答吧!