我尝试使用process函数来处理一组事件。我使用事件时间和键控流。我面临的问题是,水印值始终是9223372036854725808。我将print语句放入调试,它显示如下:
时间戳----1583128014000提取时间戳1583128014000当前水印----9223372036854775808
时间戳----1583128048000提取时间戳1583128048000当前水印----9223372036854775808
时间戳----1583128089000提取时间戳1583128089000当前水印----9223372036854775808
所以timestamp和extractedtimestamp在更改,但水印没有更新。所以没有记录作为上下文进入队列。timestamp永远不会小于水印。
DataStream<GenericRecord> dataStream = env.addSource(searchConsumer).name("search_list_keyless");
DataStream<GenericRecord> dataStreamWithWaterMark = dataStream.assignTimestampsAndWatermarks(new SessionAssigner());
try {
dataStreamWithWaterMark.keyBy((KeySelector<GenericRecord, String>) record -> {
StringBuilder builder = new StringBuilder();
builder.append(record.get("session_id"));
builder.append(record.get("user_id"));
return builder.toString();
}).process(new MatchFunction()).print();
}
catch (Exception e){
e.printStackTrace();
}
env.execute("start session process");
}
public static class SessionAssigner implements AssignerWithPunctuatedWatermarks<GenericRecord> {
@Override
public long extractTimestamp(GenericRecord record, long previousElementTimestamp) {
long timestamp = (long) record.get("event_ts");
System.out.println("timestamp------"+ timestamp);
return timestamp;
}
@Override
public Watermark checkAndGetNextWatermark(GenericRecord record, long extractedTimestamp) {
// simply emit a watermark with every event
System.out.println("extractedTimestamp "+extractedTimestamp);
return new Watermark(extractedTimestamp - 30000);
}
}
这是processfunction的代码。。。。
public class MatchFunction extends KeyedProcessFunction<String, GenericRecord, Object> {
private ValueState<Tuple2<Long, PriorityQueue<GenericRecord>>> queueState = null;
@Override
public void open(Configuration config) throws Exception {
System.out.println("open");
ValueStateDescriptor<Tuple2<Long, PriorityQueue<GenericRecord>>> descriptor = new ValueStateDescriptor<>(
"sorted-events", TypeInformation.of(new TypeHint<Tuple2<Long, PriorityQueue<GenericRecord>>>() {
})
);
queueState = getRuntimeContext().getState(descriptor);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Object> out) throws Exception {
Tuple2<Long, PriorityQueue<GenericRecord>> tuple = queueState.value();
PriorityQueue<GenericRecord> records = tuple.f1;
}
@Override
public void processElement(GenericRecord record, Context context, Collector<Object> collector) throws Exception {
TimerService timerService = context.timerService();
System.out.println("currentwatermark----"+ timerService.currentWatermark());
if (context.timestamp() > timerService.currentWatermark()) {
Tuple2<Long, PriorityQueue<GenericRecord>> queueval = queueState.value();
PriorityQueue<GenericRecord> queue = queueval.f1;
long startTime = queueval.f0;
System.out.println("starttime----"+ startTime);
if (queue == null) {
queue = new PriorityQueue<>(10, new TimeStampComprator());
startTime = (long) record.get("event_ts");
}
queueState.update(new Tuple2<>(startTime, queue));
timerService.registerEventTimeTimer(startTime + 5 * 60 * 1000);
}
}
}
1条答案
按热度按时间hmtdttj41#
以下是您所分享内容的可能解释:
这个
TimestampsAndPunctuatedWatermarksOperator
电话extractTimestamp
在它呼叫之前checkAndGetNextWatermark
对于给定的记录。这意味着processElement
在你的MatchFunction
在每个任务(并行示例)中调用时,当前水印将为long.min_值(即-9223372036854775808)。如果你的平行度足够大,那就可以解释
好几次。