org.apache.flink.streaming.api.watermark.Watermark类的使用及代码示例

x33g5p2x  于2022-02-03 转载在 其他  
字(6.8k)|赞(0)|评价(0)|浏览(151)

本文整理了Java中org.apache.flink.streaming.api.watermark.Watermark类的一些代码示例,展示了Watermark类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Watermark类的具体详情如下:
包路径:org.apache.flink.streaming.api.watermark.Watermark
类名称:Watermark

Watermark介绍

[英]A Watermark tells operators that no elements with a timestamp older or equal to the watermark timestamp should arrive at the operator. Watermarks are emitted at the sources and propagate through the operators of the topology. Operators must themselves emit watermarks to downstream operators using org.apache.flink.streaming.api.operators.Output#emitWatermark(Watermark). Operators that do not internally buffer elements can always forward the watermark that they receive. Operators that buffer elements, such as window operators, must forward a watermark after emission of elements that is triggered by the arriving watermark.

In some cases a watermark is only a heuristic and operators should be able to deal with late elements. They can either discard those or update the result and emit updates/retractions to downstream operations.

When a source closes it will emit a final watermark with timestamp Long.MAX_VALUE. When an operator receives this it will know that no more input will be arriving in the future.
[中]水印告诉运算符,时间戳早于或等于水印时间戳的元素不应到达运算符。水印在源处发出,并通过拓扑的运算符传播。运营商自身必须使用org向下游运营商发送水印。阿帕奇。弗林克。流动。应用程序编程接口。接线员。输出#发射水印(水印)。不在内部缓冲元素的运算符始终可以转发它们接收的水印。缓冲元素的运算符(如窗口运算符)必须在由到达的水印触发的元素发射后转发水印。
在某些情况下,水印只是一种启发,操作员应该能够处理后期元素。它们可以放弃这些操作,或者更新结果,并向下游操作发出更新/收回。
当一个源关闭时,它将发出一个带有时间戳的最终水印。最大值。当操作员收到此信息时,它将知道未来不会有更多的输入。

代码示例

代码示例来源:origin: apache/flink

@Override
public final Watermark getCurrentWatermark() {
  // this guarantees that the watermark never goes backwards.
  long potentialWM = currentMaxTimestamp - maxOutOfOrderness;
  if (potentialWM >= lastEmittedWatermark) {
    lastEmittedWatermark = potentialWM;
  }
  return new Watermark(lastEmittedWatermark);
}

代码示例来源:origin: apache/flink

@Override
protected boolean allowWatermark(Watermark mark) {
  // allow Long.MAX_VALUE since this is the special end-watermark that for example the Kafka source emits
  return mark.getTimestamp() == Long.MAX_VALUE && nextWatermarkTime != Long.MAX_VALUE;
}

代码示例来源:origin: apache/flink

public void processWatermark1(Watermark mark) throws Exception {
  input1Watermark = mark.getTimestamp();
  long newMin = Math.min(input1Watermark, input2Watermark);
  if (newMin > combinedWatermark) {
    combinedWatermark = newMin;
    processWatermark(new Watermark(combinedWatermark));
  }
}

代码示例来源:origin: apache/flink

/**
   * Verify no StreamRecord is equal to or later than any watermarks. This is checked over the
   * order of the elements
   *
   * @param elements An iterable containing StreamRecords and watermarks
   */
  public static void assertNoLateRecords(Iterable<Object> elements) {
    // check that no watermark is violated
    long highestWatermark = Long.MIN_VALUE;

    for (Object elem : elements) {
      if (elem instanceof Watermark) {
        highestWatermark = ((Watermark) elem).asWatermark().getTimestamp();
      } else if (elem instanceof StreamRecord) {
        boolean dataIsOnTime = highestWatermark < ((StreamRecord) elem).getTimestamp();
        Assert.assertTrue("Late data was emitted after join", dataIsOnTime);
      }
    }
  }
}

代码示例来源:origin: apache/flink

@Override
  protected void handleWatermark(Watermark mark) {
    if (mark.equals(Watermark.MAX_WATERMARK)) {
      output.emitWatermark(mark);
      lastWatermark = Long.MAX_VALUE;
    }
  }
}

代码示例来源:origin: apache/flink

public void processWatermark2(Watermark mark) throws Exception {
  input2Watermark = mark.getTimestamp();
  long newMin = Math.min(input1Watermark, input2Watermark);
  if (newMin > combinedWatermark) {
    combinedWatermark = newMin;
    processWatermark(new Watermark(combinedWatermark));
  }
}

代码示例来源:origin: apache/flink

@Override
  public Watermark checkAndGetNextWatermark(Integer lastElement, long extractedTimestamp) {
    return new Watermark(counter - 1);
  }
}

代码示例来源:origin: apache/flink

@Override
  public void processWatermark(Watermark mark) throws Exception {
    // if we receive a Long.MAX_VALUE watermark we forward it since it is used
    // to signal the end of input and to not block watermark progress downstream
    if (mark.getTimestamp() == Long.MAX_VALUE && mark.getTimestamp() > currentWatermark) {
      currentWatermark = Long.MAX_VALUE;
      output.emitWatermark(mark);
    }
  }
}

代码示例来源:origin: apache/flink

/**
 *Checks whether a new per-partition watermark is also a new cross-partition watermark.
 */
private void updateMinPunctuatedWatermark(Watermark nextWatermark) {
  if (nextWatermark.getTimestamp() > maxWatermarkSoFar) {
    long newMin = Long.MAX_VALUE;
    for (KafkaTopicPartitionState<?> state : subscribedPartitionStates) {
      @SuppressWarnings("unchecked")
      final KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> withWatermarksState =
          (KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH>) state;
      newMin = Math.min(newMin, withWatermarksState.getCurrentPartitionWatermark());
    }
    // double-check locking pattern
    if (newMin > maxWatermarkSoFar) {
      synchronized (checkpointLock) {
        if (newMin > maxWatermarkSoFar) {
          maxWatermarkSoFar = newMin;
          sourceContext.emitWatermark(new Watermark(newMin));
        }
      }
    }
  }
}

代码示例来源:origin: apache/flink

@Override
public final Watermark getCurrentWatermark() {
  return new Watermark(currentTimestamp == Long.MIN_VALUE ? Long.MIN_VALUE : currentTimestamp - 1);
}

代码示例来源:origin: apache/flink

@Nullable
public Watermark checkAndGetNewWatermark(T record, long timestamp) {
  Watermark mark = timestampsAndWatermarks.checkAndGetNextWatermark(record, timestamp);
  if (mark != null && mark.getTimestamp() > partitionWatermark) {
    partitionWatermark = mark.getTimestamp();
    return mark;
  }
  else {
    return null;
  }
}

代码示例来源:origin: apache/flink

@Override
  public void processWatermark(Watermark mark) throws Exception {
    output.emitWatermark(new Watermark(mark.getTimestamp() * 2));
  }
}

代码示例来源:origin: apache/flink

@Nullable
  @Override
  public Watermark getCurrentWatermark() {
    return new Watermark(currentTimestamp == Long.MIN_VALUE ? Long.MIN_VALUE : currentTimestamp - 1);
  }
}

代码示例来源:origin: apache/flink

public void advanceWatermark(Watermark watermark) throws Exception {
  for (InternalTimerServiceImpl<?, ?> service : timerServices.values()) {
    service.advanceWatermark(watermark.getTimestamp());
  }
}

代码示例来源:origin: apache/flink

@Override
  public Watermark getCurrentWatermark() {
    // make sure timestamps are monotonously increasing, even when the system clock re-syncs
    final long now = Math.max(System.currentTimeMillis(), maxTimestamp);
    maxTimestamp = now;
    return new Watermark(now - 1);
  }
}

代码示例来源:origin: apache/flink

@Override
public void processWatermark(Watermark mark) throws Exception {
  super.processWatermark(mark);
  this.currentWatermark = mark.getTimestamp();
}

代码示例来源:origin: apache/flink

@Override
  public Watermark checkAndGetNextWatermark(Tuple3<String, String, Integer> lastElement, long extractedTimestamp) {
    return new Watermark(lastElement.f2 - 1);
  }
}

代码示例来源:origin: apache/flink

public long getCurrentWatermarkTimestamp() {
  Watermark wm = timestampsAndWatermarks.getCurrentWatermark();
  if (wm != null) {
    partitionWatermark = Math.max(partitionWatermark, wm.getTimestamp());
  }
  return partitionWatermark;
}

代码示例来源:origin: apache/flink

@Override
  public Watermark checkAndGetNextWatermark(Integer element, long extractedTimestamp) {
    return new Watermark(extractedTimestamp - 1);
  }
})

代码示例来源:origin: apache/flink

@Override
public void processWatermark(Watermark mark) throws Exception {
  super.processWatermark(mark);
  currentWatermark = mark.getTimestamp();
}

相关文章