我有一个无序的流,我需要对它们进行排序,并在下一帧中用同一个字段对字段值求和。我的代码:
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<Message> messageswithTS = messages.assignTimestampsAndWatermarks(new TimeLagWaterMarkGenerator());
DataStream<Message> SumNumber = messageswithTS
.keyBy("deviceId")
.map(new Sumalo())
哪里 Sumalo()
是进行加法的函数。提取时间戳的代码:
public class TimeLagWaterMarkGenerator implements AssignerWithPeriodicWatermarks<Message> {
private static final long serialVersionUID = 1L;
private long currentMaxTimestamp;
@Override
public long extractTimestamp(Message element, long previousElementTimestamp) {
long timestamp = element.getDate();
currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
return timestamp;
}
@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentMaxTimestamp);
}
}
结果是:
1 TRUE 0 21 1473861657491 6af7ecfb-5122-48b6-ada1-0ea39d1d4740
1 FALSE 3 3 1473861657496 c8b4617d-534b-4c5e-825c-a8c5556fcd87
1 TRUE 1 29 1473861657497 f5b72056-ec3d-4c97-b86d-73ed728757c3
1 FALSE 0 29 1473861657501 363d061d-ce02-4709-9683-b3bb233861f3
....
正确结果:
1 TRUE 0 0 1473861657491 6af7ecfb-5122-48b6-ada1-0ea39d1d4740
1 FALSE 3 3 1473861657496 c8b4617d-534b-4c5e-825c-a8c5556fcd87
1 TRUE 1 4 1473861657497 f5b72056-ec3d-4c97-b86d-73ed728757c3
1 FALSE 0 4 1473861657501 363d061d-ce02-4709-9683-b3bb233861f3
....
感谢您的帮助。
2条答案
按热度按时间zte4gxcn1#
flink不会自动按事件时间对事件时间流进行排序,也不会提供运算符对事件时间流进行排序(这只可能在事件时间进行排序,即将无序流转换为有序流)。
但是,您可以通过扩展
AbstractStreamOperator
. 这是一个低级接口,您可以在其中访问事件、其分配的时间戳和接收到的水印。操作员可按以下步骤操作。它可以将所有到达的元素插入到按事件时间排序的堆中。当水印到达时,它会发出时间戳小于水印的所有元素。如果延迟元素到达(即,时间戳小于当前水印的元素),您可以发射它(破坏流的完整顺序)或丢弃它。操作员还需要通过将堆保持为flink管理的状态来参与检查点。您应该知道这个接口是非常低级的,需要很好地理解flink的工作原理。此外,它可能在次要版本之间更改。关于时间戳和水印赋值器,您没有给水印添加任何延迟。有了这个实现,您可能会有许多后期元素。看一看这个
BoundedOutOfOrdernessTimestampExtractor
.ghhaqwfi2#
正如前面的答案所指出的,您需要在流的转换操作中创建一个自定义操作符。我发现这个链接非常有助于解决我所追求的类似问题。