org.apache.flink.runtime.execution.Environment.getTaskInfo()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(8.0k)|赞(0)|评价(0)|浏览(182)

本文整理了Java中org.apache.flink.runtime.execution.Environment.getTaskInfo()方法的一些代码示例,展示了Environment.getTaskInfo()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Environment.getTaskInfo()方法的具体详情如下:
包路径:org.apache.flink.runtime.execution.Environment
类名称:Environment
方法名:getTaskInfo

Environment.getTaskInfo介绍

[英]Returns the TaskInfo object associated with this subtask
[中]返回与此子任务关联的TaskInfo对象

代码示例

代码示例来源:origin: apache/flink

  1. /**
  2. * Gets the name of the task, in the form "taskname (2/5)".
  3. * @return The name of the task.
  4. */
  5. public String getName() {
  6. return getEnvironment().getTaskInfo().getTaskNameWithSubtasks();
  7. }

代码示例来源:origin: apache/flink

  1. @VisibleForTesting
  2. public static <OUT> List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> createRecordWriters(
  3. StreamConfig configuration,
  4. Environment environment) {
  5. List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> recordWriters = new ArrayList<>();
  6. List<StreamEdge> outEdgesInOrder = configuration.getOutEdgesInOrder(environment.getUserClassLoader());
  7. Map<Integer, StreamConfig> chainedConfigs = configuration.getTransitiveChainedTaskConfigsWithSelf(environment.getUserClassLoader());
  8. for (int i = 0; i < outEdgesInOrder.size(); i++) {
  9. StreamEdge edge = outEdgesInOrder.get(i);
  10. recordWriters.add(
  11. createRecordWriter(
  12. edge,
  13. i,
  14. environment,
  15. environment.getTaskInfo().getTaskName(),
  16. chainedConfigs.get(edge.getSourceId()).getBufferTimeout()));
  17. }
  18. return recordWriters;
  19. }

代码示例来源:origin: apache/flink

  1. @Override
  2. public void init() throws Exception {
  3. final String iterationId = getConfiguration().getIterationId();
  4. if (iterationId == null || iterationId.length() == 0) {
  5. throw new Exception("Missing iteration ID in the task configuration");
  6. }
  7. final String brokerID = StreamIterationHead.createBrokerIdString(getEnvironment().getJobID(), iterationId,
  8. getEnvironment().getTaskInfo().getIndexOfThisSubtask());
  9. final long iterationWaitTime = getConfiguration().getIterationWaitTime();
  10. LOG.info("Iteration tail {} trying to acquire feedback queue under {}", getName(), brokerID);
  11. @SuppressWarnings("unchecked")
  12. BlockingQueue<StreamRecord<IN>> dataChannel =
  13. (BlockingQueue<StreamRecord<IN>>) BlockingQueueBroker.INSTANCE.get(brokerID);
  14. LOG.info("Iteration tail {} acquired feedback queue {}", getName(), brokerID);
  15. this.headOperator = new RecordPusher<>();
  16. this.headOperator.setup(this, getConfiguration(), new IterationTailOutput<>(dataChannel, iterationWaitTime));
  17. // call super.init() last because that needs this.headOperator to be set up
  18. super.init();
  19. }

代码示例来源:origin: apache/flink

  1. @Nonnull MetricGroup metricGroup) throws Exception {
  2. TaskInfo taskInfo = environment.getTaskInfo();
  3. OperatorSubtaskDescriptionText operatorSubtaskDescription =
  4. new OperatorSubtaskDescriptionText(

代码示例来源:origin: apache/flink

  1. public StreamingRuntimeContext(AbstractStreamOperator<?> operator,
  2. Environment env, Map<String, Accumulator<?, ?>> accumulators) {
  3. super(env.getTaskInfo(),
  4. env.getUserClassLoader(),
  5. operator.getExecutionConfig(),
  6. accumulators,
  7. env.getDistributedCacheEntries(),
  8. operator.getMetricGroup());
  9. this.operator = operator;
  10. this.taskEnvironment = env;
  11. this.streamConfig = new StreamConfig(env.getTaskConfiguration());
  12. this.operatorUniqueID = operator.getOperatorID().toString();
  13. }

代码示例来源:origin: apache/flink

  1. getEnvironment().getTaskInfo().getIndexOfThisSubtask());

代码示例来源:origin: apache/flink

  1. TaskInfo taskInfo = environment.getTaskInfo();

代码示例来源:origin: org.apache.flink/flink-runtime_2.10

  1. /**
  2. * Returns the current number of subtasks the respective task is split into.
  3. *
  4. * @return the current number of subtasks the respective task is split into
  5. */
  6. public int getCurrentNumberOfSubtasks() {
  7. return this.environment.getTaskInfo().getNumberOfParallelSubtasks();
  8. }

代码示例来源:origin: org.apache.flink/flink-runtime_2.10

  1. /**
  2. * Returns the index of this subtask in the subtask group.
  3. *
  4. * @return the index of this subtask in the subtask group
  5. */
  6. public int getIndexInSubtaskGroup() {
  7. return this.environment.getTaskInfo().getIndexOfThisSubtask();
  8. }

代码示例来源:origin: org.apache.flink/flink-streaming-java_2.10

  1. /**
  2. * Gets the name of the task, in the form "taskname (2/5)".
  3. * @return The name of the task.
  4. */
  5. public String getName() {
  6. return getEnvironment().getTaskInfo().getTaskNameWithSubtasks();
  7. }

代码示例来源:origin: org.apache.flink/flink-streaming-java_2.11

  1. /**
  2. * Gets the name of the task, in the form "taskname (2/5)".
  3. * @return The name of the task.
  4. */
  5. public String getName() {
  6. return getEnvironment().getTaskInfo().getTaskNameWithSubtasks();
  7. }

代码示例来源:origin: com.alibaba.blink/flink-runtime

  1. /**
  2. * Utility function that composes a string for logging purposes. The string includes the given message and
  3. * the index of the task in its task group together with the number of tasks in the task group.
  4. *
  5. * @param message The main message for the log.
  6. * @return The string ready for logging.
  7. */
  8. private String getLogString(String message) {
  9. return BatchTask.constructLogString(message, this.getEnvironment().getTaskInfo().getTaskName(), this);
  10. }

代码示例来源:origin: org.apache.flink/flink-runtime_2.10

  1. /**
  2. * Utility function that composes a string for logging purposes. The string includes the given message and
  3. * the index of the task in its task group together with the number of tasks in the task group.
  4. *
  5. * @param message The main message for the log.
  6. * @return The string ready for logging.
  7. */
  8. private String getLogString(String message) {
  9. return BatchTask.constructLogString(message, this.getEnvironment().getTaskInfo().getTaskName(), this);
  10. }

代码示例来源:origin: org.apache.flink/flink-runtime_2.10

  1. public DistributedRuntimeUDFContext createRuntimeContext() {
  2. Environment env = getEnvironment();
  3. String sourceName = getEnvironment().getTaskInfo().getTaskName().split("->")[0].trim();
  4. sourceName = sourceName.startsWith("CHAIN") ? sourceName.substring(6) : sourceName;
  5. return new DistributedRuntimeUDFContext(env.getTaskInfo(), getUserCodeClassLoader(),
  6. getExecutionConfig(), env.getDistributedCacheEntries(), env.getAccumulatorRegistry().getUserMap(),
  7. getEnvironment().getMetricGroup().addOperator(sourceName));
  8. }
  9. }

代码示例来源:origin: org.apache.flink/flink-runtime

  1. public DistributedRuntimeUDFContext createRuntimeContext() {
  2. Environment env = getEnvironment();
  3. String sourceName = getEnvironment().getTaskInfo().getTaskName().split("->")[0].trim();
  4. sourceName = sourceName.startsWith("CHAIN") ? sourceName.substring(6) : sourceName;
  5. return new DistributedRuntimeUDFContext(env.getTaskInfo(), getUserCodeClassLoader(),
  6. getExecutionConfig(), env.getDistributedCacheEntries(), env.getAccumulatorRegistry().getUserMap(),
  7. getEnvironment().getMetricGroup().getOrAddOperator(sourceName));
  8. }
  9. }

代码示例来源:origin: org.apache.flink/flink-runtime_2.11

  1. public DistributedRuntimeUDFContext createRuntimeContext(MetricGroup metrics) {
  2. Environment env = getEnvironment();
  3. return new DistributedRuntimeUDFContext(env.getTaskInfo(), getUserCodeClassLoader(),
  4. getExecutionConfig(), env.getDistributedCacheEntries(), this.accumulatorMap, metrics);
  5. }

代码示例来源:origin: org.apache.flink/flink-runtime

  1. public DistributedRuntimeUDFContext createRuntimeContext(MetricGroup metrics) {
  2. Environment env = getEnvironment();
  3. return new DistributedRuntimeUDFContext(env.getTaskInfo(), getUserCodeClassLoader(),
  4. getExecutionConfig(), env.getDistributedCacheEntries(), this.accumulatorMap, metrics);
  5. }

代码示例来源:origin: com.alibaba.blink/flink-runtime

  1. public DistributedRuntimeUDFContext createRuntimeContext(MetricGroup metrics) {
  2. Environment env = getEnvironment();
  3. return new DistributedRuntimeUDFContext(env.getTaskInfo(), getUserCodeClassLoader(),
  4. getExecutionConfig(), env.getDistributedCacheEntries(), this.accumulatorRegistry, metrics);
  5. }

代码示例来源:origin: org.apache.flink/flink-runtime_2.10

  1. public DistributedRuntimeUDFContext createRuntimeContext() {
  2. Environment env = getEnvironment();
  3. return new DistributedRuntimeUDFContext(env.getTaskInfo(), getUserCodeClassLoader(),
  4. getExecutionConfig(), env.getDistributedCacheEntries(), env.getAccumulatorRegistry().getUserMap(),
  5. getEnvironment().getMetricGroup().addOperator(getEnvironment().getTaskInfo().getTaskName()));
  6. }
  7. }

代码示例来源:origin: org.apache.flink/flink-runtime_2.10

  1. public String brokerKey() {
  2. if (brokerKey == null) {
  3. int iterationId = config.getIterationId();
  4. brokerKey = getEnvironment().getJobID().toString() + '#' + iterationId + '#' +
  5. getEnvironment().getTaskInfo().getIndexOfThisSubtask();
  6. }
  7. return brokerKey;
  8. }

相关文章