在apache flink中,我有两个tuple8流。事件元组的8个字段中有4个(tuple4)充当键。我想执行两个流之间存在的记录的关联,作为这一步,我使用join操作符连接两个流。根据语义,我应该得到包含内部连接记录的输出流。但是,我没有得到任何输出或匹配。env的时间特性被设置为事件时间戳,元组的第一个元素是时间戳,我提取它并使用assign将它标记为时间戳
DataStream<String> input = env.readTextFile("/tmp/logScrape/out/raw-input.out");
DataStream<Tuple8<Long, Integer, String, Integer, String, Integer, Integer, String>> inFiltered =
input.flatMap(new Splitter())
.filter(new InFilter())
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple8<Long, Integer, String, Integer, String, Integer, Integer, String>>(Time.seconds(10)) {
@Override
public long extractTimestamp(Tuple8<Long, Integer, String, Integer, String, Integer, Integer, String> record) {
return record.f0;
}
});
DataStream<Tuple8<Long, Integer, String, Integer, String, Integer, Integer, String>> exitFiltered =
input.flatMap(new Splitter())
.filter(new ExitFilter())
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple8<Long, Integer, String, Integer, String, Integer, Integer, String>>(Time.seconds(10)) {
@Override
public long extractTimestamp(Tuple8<Long, Integer, String, Integer, String, Integer, Integer, String> record) {
return record.f0;
}
});
inFiltered.join(exitFiltered)
.where(new TupleKeySelector())
.equalTo(new TupleKeySelector())
.window(TumblingEventTimeWindows.of(Time.milliseconds(1000000)))
.apply(new StreamJoinner())
.writeAsText("/tmp/logScrape/out/output");
public static class TupleKeySelector implements KeySelector<
Tuple8<Long, Integer, String, Integer, String, Integer, Integer, String>,
Tuple4<String, Integer, String, Integer>> {
@Override
public Tuple4<String, Integer, String, Integer> getKey(Tuple8<Long, Integer, String, Integer, String, Integer, Integer, String> value) {
return new Tuple4<>(value.f2, value.f3, value.f4, value.f5);
}
}
这是我的输出记录 inFiltered
流动
(1519254461076381189,1234,program1,11,program2,20,27,in)
(1519254462071697685,1234,program1,11,program2,20,27,in)
(1519254463067014246,1234,program1,11,program2,20,27,in)
这是我的输出记录 exitFiltered
流动
(1519254458167640292,6789,program1,11,program2,20,27,out)
(1519254460158076301,6789,program1,11,program2,20,27,out)
(1519254461153294238,6789,program1,11,program2,20,27,out)
(1519254462148512207,6789,program1,11,program2,20,27,out)
(1519254463143730191,6789,program1,11,program2,20,27,out)
问题:
有什么我在这里错过了,我应该开始看到结果?
有什么方法可以在处理过程中调试代码吗?我不确定我的情况下,如果它的关键选择是问题或窗口没有发生正确的。
1条答案
按热度按时间f87krz0w1#
你有一个1000000毫秒的滚动窗口,对吗?通过查看两个过滤流(第一个字段,对吧?)的时间戳,我没有看到在同一个1毫秒内发生的任何事件。