org.apache.flink.api.common.accumulators.Accumulator类的使用及代码示例

x33g5p2x  于2022-01-16 转载在 其他  
字(7.3k)|赞(0)|评价(0)|浏览(152)

本文整理了Java中org.apache.flink.api.common.accumulators.Accumulator类的一些代码示例,展示了Accumulator类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Accumulator类的具体详情如下:
包路径:org.apache.flink.api.common.accumulators.Accumulator
类名称:Accumulator

Accumulator介绍

[英]Accumulators collect distributed statistics or aggregates in a from user functions and operators. Each parallel instance creates and updates its own accumulator object, and the different parallel instances of the accumulator are later merged. merged by the system at the end of the job. The result can be obtained from the result of a job execution, or from the web runtime monitor. The accumulators are inspired by the Hadoop/MapReduce counters. The type added to the accumulator might differ from the type returned. This is the case e.g. for a set-accumulator: We add single objects, but the result is a set of objects.
[中]累加器从用户函数和运算符中收集分布式统计信息或聚合。每个并行实例创建并更新自己的累加器对象,累加器的不同并行实例稍后合并。作业结束时由系统合并。可以从作业执行的结果或web运行时监视器获得结果。累加器的设计灵感来自Hadoop/MapReduce计数器。添加到累加器的类型可能与返回的类型不同。例如,对于集合累加器,情况就是这样:我们添加单个对象,但结果是一组对象。

代码示例

代码示例来源:origin: apache/flink

@Override
public void merge(Accumulator<Long, Long> other) {
  this.localValue += other.getLocalValue();
}

代码示例来源:origin: apache/flink

public static Map<String, Accumulator<?, ?>> copy(Map<String, Accumulator<?, ?>> accumulators) {
  Map<String, Accumulator<?, ?>> result = new HashMap<String, Accumulator<?, ?>>();
  for (Map.Entry<String, Accumulator<?, ?>> entry: accumulators.entrySet()){
    result.put(entry.getKey(), entry.getValue().clone());
  }
  return result;
}

代码示例来源:origin: apache/flink

/**
 * Workaround method for type safety.
 */
private static <V, R extends Serializable> Accumulator<V, R> mergeSingle(Accumulator<?, ?> target,
                                     Accumulator<?, ?> toMerge) {
  @SuppressWarnings("unchecked")
  Accumulator<V, R> typedTarget = (Accumulator<V, R>) target;
  @SuppressWarnings("unchecked")
  Accumulator<V, R> typedToMerge = (Accumulator<V, R>) toMerge;
  typedTarget.merge(typedToMerge);
  return typedTarget;
}

代码示例来源:origin: apache/flink

@Override
  public void invoke(T value) throws Exception {
    count++;
    getRuntimeContext().getAccumulator(NUM_ELEMENTS_ACCUMULATOR).add(1);
  }
}

代码示例来源:origin: com.alibaba.blink/flink-runtime

@SuppressWarnings("unchecked")
void commitForTasks(JobVertexID jobVertexId, Accumulator value, Set<Integer> committedTasks) {
  checkArgument(this.jobVertexId.equals(jobVertexId), "The registered task belongs to different JobVertex with previous registered ones");
  if (aggregatedValue == null) {
    aggregatedValue = value.clone();
  } else {
    aggregatedValue.merge(value);
  }
  this.committedTasks.addAll(committedTasks);
}

代码示例来源:origin: apache/flink

@Override
  public void invoke(T value, Context context) throws Exception {
    count++;
    getRuntimeContext().getAccumulator(NUM_ELEMENTS_ACCUMULATOR).add(1);
  }
}

代码示例来源:origin: com.alibaba.blink/flink-runtime

@SuppressWarnings("unchecked")
void commitForTask(JobVertexID jobVertexId, int subtaskIndex, Accumulator value) {
  checkArgument(this.jobVertexId.equals(jobVertexId),
    "The registered task belongs to different JobVertex with previous registered ones");
  checkState(registeredTasks.contains(subtaskIndex), "Can not commit for an accumulator that has " +
    "not been registered before");
  if (aggregatedValue == null) {
    aggregatedValue = value.clone();
  } else {
    aggregatedValue.merge(value);
  }
  committedTasks.add(subtaskIndex);
}

代码示例来源:origin: apache/flink

@Override
public void merge(Accumulator<Integer, Integer> other) {
  this.localValue += other.getLocalValue();
}

代码示例来源:origin: apache/flink

@Override
public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {
  out.collect(value);
  getRuntimeContext().getAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR).add(1);
}

代码示例来源:origin: apache/flink

wrapUnchecked(otherEntry.getKey(), () -> otherEntry.getValue().clone()));
wrapUnchecked(otherEntry.getKey(), () -> mergeSingle(accumulator, otherEntry.getValue().clone())));

代码示例来源:origin: com.alibaba.blink/flink-core

/**
 * Workaround method for type safety.
 */
private static <V, R extends Serializable> Accumulator<V, R> mergeSingle(Accumulator<?, ?> target,
                                     Accumulator<?, ?> toMerge) {
  @SuppressWarnings("unchecked")
  Accumulator<V, R> typedTarget = (Accumulator<V, R>) target;
  @SuppressWarnings("unchecked")
  Accumulator<V, R> typedToMerge = (Accumulator<V, R>) toMerge;
  typedTarget.merge(typedToMerge);
  return typedTarget;
}

代码示例来源:origin: apache/flink

@Override
public void merge(Accumulator<Double, Double> other) {
  this.localValue += other.getLocalValue();
}

代码示例来源:origin: apache/flink

@Override
  public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {
    out.collect(value);
    ValueState<Long> state = getRuntimeContext().getState(stateDescriptor);
    if (state == null) {
      throw new RuntimeException("Missing key value state for " + value);
    }
    assertEquals(value.f1, state.value());
    getRuntimeContext().getAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR).add(1);
  }
}

代码示例来源:origin: org.apache.flink/flink-core

public static Map<String, Accumulator<?, ?>> copy(Map<String, Accumulator<?, ?>> accumulators) {
  Map<String, Accumulator<?, ?>> result = new HashMap<String, Accumulator<?, ?>>();
  for (Map.Entry<String, Accumulator<?, ?>> entry: accumulators.entrySet()){
    result.put(entry.getKey(), entry.getValue().clone());
  }
  return result;
}

代码示例来源:origin: org.apache.flink/flink-core

/**
 * Workaround method for type safety.
 */
private static <V, R extends Serializable> Accumulator<V, R> mergeSingle(Accumulator<?, ?> target,
                                     Accumulator<?, ?> toMerge) {
  @SuppressWarnings("unchecked")
  Accumulator<V, R> typedTarget = (Accumulator<V, R>) target;
  @SuppressWarnings("unchecked")
  Accumulator<V, R> typedToMerge = (Accumulator<V, R>) toMerge;
  typedTarget.merge(typedToMerge);
  return typedTarget;
}

代码示例来源:origin: apache/flink

@Override
public void merge(Accumulator<Long, Long> other) {
  this.min = Math.min(this.min, other.getLocalValue());
}

代码示例来源:origin: apache/flink

@Override
  public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {
    out.collect(value);
    ValueState<Long> state = getRuntimeContext().getState(stateDescriptor);
    if (state == null) {
      throw new RuntimeException("Missing key value state for " + value);
    }
    assertEquals(value.f1, state.value());
    getRuntimeContext().getAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR).add(1);
  }
}

代码示例来源:origin: com.alibaba.blink/flink-core

public static Map<String, Accumulator<?, ?>> copy(Map<String, Accumulator<?, ?>> accumulators) {
  Map<String, Accumulator<?, ?>> result = new HashMap<String, Accumulator<?, ?>>();
  for (Map.Entry<String, Accumulator<?, ?>> entry: accumulators.entrySet()){
    result.put(entry.getKey(), entry.getValue().clone());
  }
  return result;
}

代码示例来源:origin: apache/flink

@Override
public void merge(Accumulator<Integer, Integer> other) {
  this.max = Math.max(this.max, other.getLocalValue());
}

代码示例来源:origin: apache/flink

@Override
  public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {
    out.collect(value);
    ValueState<Long> state = getRuntimeContext().getState(stateDescriptor);
    if (state == null) {
      throw new RuntimeException("Missing key value state for " + value);
    }
    assertEquals(value.f1, state.value());
    getRuntimeContext().getAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR).add(1);
  }
}

相关文章