我加入两个流创建一个新的流后操作。代码如下:
DataStream<NewTableA> join1 =
oldTableADataStream
.keyBy(t -> t.getFa3())
.join(tableBDataStream)
.where(new oldTableAKeySelector())
.equalTo(new TableBKeySelector())
.window(EventTimeSessionWindows.withGap(Time.milliseconds(WIN_GAP_TIME)))
.allowedLateness(Time.milliseconds(allowedLateness))
.apply(new oldTableAJoinTableBFunc());
//.assignTimestampsAndWatermarks(new assignTSAndWMLastMax<>(maxOutOfOrderness));
join1.process(
new ProcessFunction<NewTableA, NewTableA>() {
@Override
public void processElement(NewTableA value, Context ctx, Collector<NewTableA> out)
throws Exception {
System.out.println(" NewTableA wmts:" + ctx.timerService().currentWatermark());
System.out.println(" NewTableA ts:" + ctx.timestamp() + " " + value);
}
});
密码 oldTableAJoinTableBFunc
如下所示
public class oldTableAJoinTableBFunc implements JoinFunction<OldTableA, TableB, NewTableA> {
@Override
public NewTableA join(OldTableA oldTableA, TableB tableB) throws Exception {
//System.out.println("join1 on");
NewTableA newTableA = new NewTableA();
newTableA.setPA1(oldTableA.getPa1());
newTableA.setA2(oldTableA.getA2());
newTableA.setFA3(oldTableA.getFa3());
newTableA.setFA4(oldTableA.getFa4());
newTableA.setB2(tableB.getB2());
newTableA.setB3(tableB.getB3());
// importance!!!
newTableA.setTs(oldTableA.getTs());
return newTableA;
}
}
上面的例子, oldTableADataStream
加入 tableBDataStream
至 join1
事件时间流。
我发现了一个有趣的现象。中事件的时间戳 join1
是flink自动创建的。
当我创建的测试数据 oldTableADataStream
以及 tableBDataStream
,我故意设置了所有1000000010和100000044。但是在join和apply函数之后,新流中的事件时间戳 join1
由flink更改,打印如下:
NewTableA wmts:**1000000042**
NewTableA ts:**1000000143**NewTableA{PA1=10, A2='a20', FA3=21, B2='b21', B3='b31', FA4=39, C2='null', FC3=null, D2='null', D3='null', ts=**1000000011**}
NewTableA wmts:1000000042
NewTableA wmts:1000000042
NewTableA ts:**1000000143**NewTableA{PA1=1, A2='a20', FA3=20, B2='b20', B3='b30', FA4=30, C2='null', FC3=null, D2='null', D3='null', ts=**1000000010**}
NewTableA wmts:1000000042
NewTableA ts:1000000143 NewTableA{PA1=11, A2='a20', FA3=21, B2='b21', B3='b31', FA4=40, C2='null', FC3=null, D2='null', D3='null', ts=1000000011}
NewTableA ts:1000000135 NewTableA{PA1=21, A2='a20', FA3=22, B2='b22', B3='b32', FA4=30, C2='null', FC3=null, D2='null', D3='null', ts=1000000011}
NewTableA ts:1000000138 NewTableA{PA1=38, A2='a20', FA3=23, B2='b23', B3='b33', FA4=34, C2='null', FC3=null, D2='null', D3='null', ts=1000000015}
NewTableA wmts:1000000042
NewTableA wmts:1000000042
NewTableA ts:1000000138 NewTableA{PA1=57, A2='a20', FA3=24, B2='b24', B3='b34', FA4=36, C2='null', FC3=null, D2='null', D3='null', ts=1000000018}
NewTableA ts:1000000143 NewTableA{PA1=2, A2='a20', FA3=20, B2='b20', B3='b30', FA4=31, C2='null', FC3=null, D2='null', D3='null', ts=1000000011}
NewTableA wmts:1000000042
NewTableA ts:1000000143 NewTableA{PA1=12, A2='a20', FA3=21, B2='b21', B3='b31', FA4=41, C2='null', FC3=null, D2='null', D3='null', ts=1000000011}
NewTableA wmts:1000000042
NewTableA wmts:1000000042
NewTableA ts:1000000135 NewTableA{PA1=22, A2='a20', FA3=22, B2='b22', B3='b32', FA4=31, C2='null', FC3=null, D2='null', D3='null', ts=1000000011}
NewTableA wmts:1000000042
NewTableA ts:1000000138 NewTableA{PA1=58, A2='a20', FA3=24, B2='b24', B3='b34', FA4=36, C2='null', FC3=null, D2='null', D3='null', ts=1000000018}
NewTableA wmts:1000000042
NewTableA wmts:1000000042
NewTableA wmts:1000000042
NewTableA ts:1000000143 NewTableA{PA1=13, A2='a20', FA3=21, B2='b21', B3='b31', FA4=42, C2='null', FC3=null, D2='null', D3='null', ts=1000000011}
NewTableA ts:1000000138 NewTableA{PA1=39, A2='a20', FA3=23, B2='b23', B3='b33', FA4=34, C2='null', FC3=null, D2='null', D3='null', ts=1000000015}
NewTableA wmts:1000000042
NewTableA ts:1000000138 NewTableA{PA1=59, A2='a20', FA3=24, B2='b24', B3='b34', FA4=36, C2='null', FC3=null, D2='null', D3='null', ts=1000000018}
NewTableA ts:1000000135 NewTableA{PA1=23, A2='a20', FA3=22, B2='b22', B3='b32', FA4=32, C2='null', FC3=null, D2='null', D3='null', ts=1000000011}
NewTableA ts:1000000143 NewTableA{PA1=3, A2='a20', FA3=20, B2='b20', B3='b30', FA4=32, C2='null', FC3=null, D2='null', D3='null', ts=1000000011}
NewTableA wmts:1000000042
NewTableA wmts:1000000042
NewTableA ts:1000000143 NewTableA{PA1=14, A2='a20', FA3=21, B2='b21', B3='b31', FA4=43, C2='null', FC3=null, D2='null', D3='null', ts=1000000011}
NewTableA ts:1000000138 NewTableA{PA1=40, A2='a20', FA3=23, B2='b23', B3='b33', FA4=34, C2='null', FC3=null, D2='null', D3='null', ts=1000000016}
NewTableA ts:1000000135 NewTableA{PA1=24, A2='a20', FA3=22, B2='b22', B3='b32', FA4=33, C2='null', FC3=null, D2='null', D3='null', ts=1000000011}
NewTableA wmts:1000000042
NewTableA wmts:1000000042
......
似乎没有规则创建新事件的时间戳,如何计算1000000143、1000000138和1000000135等等?似乎与水印无关,因为时间戳水印是100000042,它同时与事件的时间戳不同。
操作依赖什么规则来生成新的事件时间戳,我还没有找到官方的指令,谁能给它们一个链接?
1条答案
按热度按时间y53ybaqx1#
由时间窗口创建的输出事件的时间戳设置为该窗口内的最大时间戳。所以它基本上是包含事件的会话的会话边界。
由于您已经基于fa3设置了会话窗口的关键帧,因此您将获得依赖于fa3的值:20和21的值为1000000143,22的值为1000000135,23和24的值为1000000138。