9223372036854725808的形式出现

u4dcyp6a  于 2021-06-24  发布在  Flink
关注(0)|答案(1)|浏览(376)

我尝试使用process函数来处理一组事件。我使用事件时间和键控流。我面临的问题是,水印值始终是9223372036854725808。我将print语句放入调试,它显示如下:
时间戳----1583128014000提取时间戳1583128014000当前水印----9223372036854775808
时间戳----1583128048000提取时间戳1583128048000当前水印----9223372036854775808
时间戳----1583128089000提取时间戳1583128089000当前水印----9223372036854775808
所以timestamp和extractedtimestamp在更改,但水印没有更新。所以没有记录作为上下文进入队列。timestamp永远不会小于水印。

  1. DataStream<GenericRecord> dataStream = env.addSource(searchConsumer).name("search_list_keyless");
  2. DataStream<GenericRecord> dataStreamWithWaterMark = dataStream.assignTimestampsAndWatermarks(new SessionAssigner());
  3. try {
  4. dataStreamWithWaterMark.keyBy((KeySelector<GenericRecord, String>) record -> {
  5. StringBuilder builder = new StringBuilder();
  6. builder.append(record.get("session_id"));
  7. builder.append(record.get("user_id"));
  8. return builder.toString();
  9. }).process(new MatchFunction()).print();
  10. }
  11. catch (Exception e){
  12. e.printStackTrace();
  13. }
  14. env.execute("start session process");
  15. }
  16. public static class SessionAssigner implements AssignerWithPunctuatedWatermarks<GenericRecord> {
  17. @Override
  18. public long extractTimestamp(GenericRecord record, long previousElementTimestamp) {
  19. long timestamp = (long) record.get("event_ts");
  20. System.out.println("timestamp------"+ timestamp);
  21. return timestamp;
  22. }
  23. @Override
  24. public Watermark checkAndGetNextWatermark(GenericRecord record, long extractedTimestamp) {
  25. // simply emit a watermark with every event
  26. System.out.println("extractedTimestamp "+extractedTimestamp);
  27. return new Watermark(extractedTimestamp - 30000);
  28. }
  29. }

这是processfunction的代码。。。。

  1. public class MatchFunction extends KeyedProcessFunction<String, GenericRecord, Object> {
  2. private ValueState<Tuple2<Long, PriorityQueue<GenericRecord>>> queueState = null;
  3. @Override
  4. public void open(Configuration config) throws Exception {
  5. System.out.println("open");
  6. ValueStateDescriptor<Tuple2<Long, PriorityQueue<GenericRecord>>> descriptor = new ValueStateDescriptor<>(
  7. "sorted-events", TypeInformation.of(new TypeHint<Tuple2<Long, PriorityQueue<GenericRecord>>>() {
  8. })
  9. );
  10. queueState = getRuntimeContext().getState(descriptor);
  11. }
  12. @Override
  13. public void onTimer(long timestamp, OnTimerContext ctx, Collector<Object> out) throws Exception {
  14. Tuple2<Long, PriorityQueue<GenericRecord>> tuple = queueState.value();
  15. PriorityQueue<GenericRecord> records = tuple.f1;
  16. }
  17. @Override
  18. public void processElement(GenericRecord record, Context context, Collector<Object> collector) throws Exception {
  19. TimerService timerService = context.timerService();
  20. System.out.println("currentwatermark----"+ timerService.currentWatermark());
  21. if (context.timestamp() > timerService.currentWatermark()) {
  22. Tuple2<Long, PriorityQueue<GenericRecord>> queueval = queueState.value();
  23. PriorityQueue<GenericRecord> queue = queueval.f1;
  24. long startTime = queueval.f0;
  25. System.out.println("starttime----"+ startTime);
  26. if (queue == null) {
  27. queue = new PriorityQueue<>(10, new TimeStampComprator());
  28. startTime = (long) record.get("event_ts");
  29. }
  30. queueState.update(new Tuple2<>(startTime, queue));
  31. timerService.registerEventTimeTimer(startTime + 5 * 60 * 1000);
  32. }
  33. }
  34. }
hmtdttj4

hmtdttj41#

以下是您所分享内容的可能解释:
这个 TimestampsAndPunctuatedWatermarksOperator 电话 extractTimestamp 在它呼叫之前 checkAndGetNextWatermark 对于给定的记录。这意味着 processElement 在你的 MatchFunction 在每个任务(并行示例)中调用时,当前水印将为long.min_值(即-9223372036854775808)。
如果你的平行度足够大,那就可以解释

  1. currentwatermark-----9223372036854775808

好几次。

相关问题