flink时间戳,在流中添加计算字段

kb5ga3dv  于 2021-06-21  发布在  Flink
关注(0)|答案(2)|浏览(726)

我有一个无序的流,我需要对它们进行排序,并在下一帧中用同一个字段对字段值求和。我的代码:

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<Message> messageswithTS = messages.assignTimestampsAndWatermarks(new TimeLagWaterMarkGenerator());
DataStream<Message> SumNumber = messageswithTS 
            .keyBy("deviceId")
            .map(new Sumalo())

哪里 Sumalo() 是进行加法的函数。提取时间戳的代码:

public class TimeLagWaterMarkGenerator implements AssignerWithPeriodicWatermarks<Message> {

private static final long serialVersionUID = 1L;
private long currentMaxTimestamp;

@Override
public long extractTimestamp(Message element, long previousElementTimestamp) {
    long timestamp = element.getDate();
    currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
    return timestamp;
}

@Override
public Watermark getCurrentWatermark() {
    return new Watermark(currentMaxTimestamp);
}
}

结果是:

1   TRUE    0   21  1473861657491   6af7ecfb-5122-48b6-ada1-0ea39d1d4740
1   FALSE   3   3   1473861657496   c8b4617d-534b-4c5e-825c-a8c5556fcd87
1   TRUE    1   29  1473861657497   f5b72056-ec3d-4c97-b86d-73ed728757c3
1   FALSE   0   29  1473861657501   363d061d-ce02-4709-9683-b3bb233861f3
....

正确结果:

1   TRUE    0   0   1473861657491   6af7ecfb-5122-48b6-ada1-0ea39d1d4740
1   FALSE   3   3   1473861657496   c8b4617d-534b-4c5e-825c-a8c5556fcd87
1   TRUE    1   4   1473861657497   f5b72056-ec3d-4c97-b86d-73ed728757c3
1   FALSE   0   4   1473861657501   363d061d-ce02-4709-9683-b3bb233861f3
....

感谢您的帮助。

zte4gxcn

zte4gxcn1#

flink不会自动按事件时间对事件时间流进行排序,也不会提供运算符对事件时间流进行排序(这只可能在事件时间进行排序,即将无序流转换为有序流)。
但是,您可以通过扩展 AbstractStreamOperator . 这是一个低级接口,您可以在其中访问事件、其分配的时间戳和接收到的水印。操作员可按以下步骤操作。它可以将所有到达的元素插入到按事件时间排序的堆中。当水印到达时,它会发出时间戳小于水印的所有元素。如果延迟元素到达(即,时间戳小于当前水印的元素),您可以发射它(破坏流的完整顺序)或丢弃它。操作员还需要通过将堆保持为flink管理的状态来参与检查点。您应该知道这个接口是非常低级的,需要很好地理解flink的工作原理。此外,它可能在次要版本之间更改。
关于时间戳和水印赋值器,您没有给水印添加任何延迟。有了这个实现,您可能会有许多后期元素。看一看这个 BoundedOutOfOrdernessTimestampExtractor .

ghhaqwfi

ghhaqwfi2#

正如前面的答案所指出的,您需要在流的转换操作中创建一个自定义操作符。我发现这个链接非常有助于解决我所追求的类似问题。

相关问题