org.apache.flink.runtime.execution.Environment.getWriter()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(12.5k)|赞(0)|评价(0)|浏览(156)

本文整理了Java中org.apache.flink.runtime.execution.Environment.getWriter()方法的一些代码示例,展示了Environment.getWriter()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Environment.getWriter()方法的具体详情如下:
包路径:org.apache.flink.runtime.execution.Environment
类名称:Environment
方法名:getWriter

Environment.getWriter介绍

暂无

代码示例

代码示例来源:origin: apache/flink

  1. public MockRecordWriter(DataSourceTask<?> inputBase, Class<StreamRecord<Tuple1<Integer>>> outputClass) {
  2. super(inputBase.getEnvironment().getWriter(0));
  3. }

代码示例来源:origin: apache/flink

  1. private static <OUT> RecordWriter<SerializationDelegate<StreamRecord<OUT>>> createRecordWriter(
  2. StreamEdge edge,
  3. int outputIndex,
  4. Environment environment,
  5. String taskName,
  6. long bufferTimeout) {
  7. @SuppressWarnings("unchecked")
  8. StreamPartitioner<OUT> outputPartitioner = (StreamPartitioner<OUT>) edge.getPartitioner();
  9. LOG.debug("Using partitioner {} for output {} of task {}", outputPartitioner, outputIndex, taskName);
  10. ResultPartitionWriter bufferWriter = environment.getWriter(outputIndex);
  11. // we initialize the partitioner here with the number of key groups (aka max. parallelism)
  12. if (outputPartitioner instanceof ConfigurableStreamPartitioner) {
  13. int numKeyGroups = bufferWriter.getNumTargetKeyGroups();
  14. if (0 < numKeyGroups) {
  15. ((ConfigurableStreamPartitioner) outputPartitioner).configure(numKeyGroups);
  16. }
  17. }
  18. RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output =
  19. RecordWriter.createRecordWriter(bufferWriter, outputPartitioner, bufferTimeout, taskName);
  20. output.setMetricGroup(environment.getMetricGroup().getIOMetricGroup());
  21. return output;
  22. }
  23. }

代码示例来源:origin: apache/flink

  1. @Override
  2. public void invoke() throws Exception {
  3. RecordWriter<SpeedTestRecord> writer = new RecordWriter<>(getEnvironment().getWriter(0));
  4. try {
  5. // Determine the amount of data to send per subtask
  6. int dataVolumeGb = getTaskConfiguration().getInteger(NetworkStackThroughputITCase.DATA_VOLUME_GB_CONFIG_KEY, 1);
  7. long dataMbPerSubtask = (dataVolumeGb * 10) / getCurrentNumberOfSubtasks();
  8. long numRecordsToEmit = (dataMbPerSubtask * 1024 * 1024) / SpeedTestRecord.RECORD_SIZE;
  9. LOG.info(String.format("%d/%d: Producing %d records (each record: %d bytes, total: %.2f GB)",
  10. getIndexInSubtaskGroup() + 1, getCurrentNumberOfSubtasks(), numRecordsToEmit,
  11. SpeedTestRecord.RECORD_SIZE, dataMbPerSubtask / 1024.0));
  12. boolean isSlow = getTaskConfiguration().getBoolean(IS_SLOW_SENDER_CONFIG_KEY, false);
  13. int numRecords = 0;
  14. SpeedTestRecord record = new SpeedTestRecord();
  15. for (long i = 0; i < numRecordsToEmit; i++) {
  16. if (isSlow && (numRecords++ % IS_SLOW_EVERY_NUM_RECORDS) == 0) {
  17. Thread.sleep(IS_SLOW_SLEEP_MS);
  18. }
  19. writer.emit(record);
  20. }
  21. }
  22. finally {
  23. writer.clearBuffers();
  24. writer.flushAll();
  25. }
  26. }
  27. }

代码示例来源:origin: apache/flink

  1. @Override
  2. public void invoke() throws Exception {
  3. RecordReader<SpeedTestRecord> reader = new RecordReader<>(
  4. getEnvironment().getInputGate(0),
  5. SpeedTestRecord.class,
  6. getEnvironment().getTaskManagerInfo().getTmpDirectories());
  7. RecordWriter<SpeedTestRecord> writer = new RecordWriter<>(getEnvironment().getWriter(0));
  8. try {
  9. SpeedTestRecord record;
  10. while ((record = reader.next()) != null) {
  11. writer.emit(record);
  12. }
  13. }
  14. finally {
  15. reader.clearBuffers();
  16. writer.clearBuffers();
  17. writer.flushAll();
  18. }
  19. }
  20. }

代码示例来源:origin: org.apache.flink/flink-runtime

  1. @Override
  2. protected void initOutputs() throws Exception {
  3. // initialize the regular outputs first (the ones into the step function).
  4. super.initOutputs();
  5. // at this time, the outputs to the step function are created
  6. // add the outputs for the final solution
  7. List<RecordWriter<?>> finalOutputWriters = new ArrayList<RecordWriter<?>>();
  8. final TaskConfig finalOutConfig = this.config.getIterationHeadFinalOutputConfig();
  9. final ClassLoader userCodeClassLoader = getUserCodeClassLoader();
  10. this.finalOutputCollector = BatchTask.getOutputCollector(this, finalOutConfig,
  11. userCodeClassLoader, finalOutputWriters, config.getNumOutputs(), finalOutConfig.getNumOutputs());
  12. // sanity check the setup
  13. final int writersIntoStepFunction = this.eventualOutputs.size();
  14. final int writersIntoFinalResult = finalOutputWriters.size();
  15. final int syncGateIndex = this.config.getIterationHeadIndexOfSyncOutput();
  16. if (writersIntoStepFunction + writersIntoFinalResult != syncGateIndex) {
  17. throw new Exception("Error: Inconsistent head task setup - wrong mapping of output gates.");
  18. }
  19. // now, we can instantiate the sync gate
  20. this.toSync = new RecordWriter<IOReadableWritable>(getEnvironment().getWriter(syncGateIndex));
  21. this.toSyncPartitionId = getEnvironment().getWriter(syncGateIndex).getPartitionId();
  22. }

代码示例来源:origin: org.apache.flink/flink-runtime_2.10

  1. @Override
  2. protected void initOutputs() throws Exception {
  3. // initialize the regular outputs first (the ones into the step function).
  4. super.initOutputs();
  5. // at this time, the outputs to the step function are created
  6. // add the outputs for the final solution
  7. List<RecordWriter<?>> finalOutputWriters = new ArrayList<RecordWriter<?>>();
  8. final TaskConfig finalOutConfig = this.config.getIterationHeadFinalOutputConfig();
  9. final ClassLoader userCodeClassLoader = getUserCodeClassLoader();
  10. this.finalOutputCollector = BatchTask.getOutputCollector(this, finalOutConfig,
  11. userCodeClassLoader, finalOutputWriters, config.getNumOutputs(), finalOutConfig.getNumOutputs());
  12. // sanity check the setup
  13. final int writersIntoStepFunction = this.eventualOutputs.size();
  14. final int writersIntoFinalResult = finalOutputWriters.size();
  15. final int syncGateIndex = this.config.getIterationHeadIndexOfSyncOutput();
  16. if (writersIntoStepFunction + writersIntoFinalResult != syncGateIndex) {
  17. throw new Exception("Error: Inconsistent head task setup - wrong mapping of output gates.");
  18. }
  19. // now, we can instantiate the sync gate
  20. this.toSync = getEnvironment().getWriter(syncGateIndex);
  21. }

代码示例来源:origin: org.apache.flink/flink-runtime_2.11

  1. @Override
  2. protected void initOutputs() throws Exception {
  3. // initialize the regular outputs first (the ones into the step function).
  4. super.initOutputs();
  5. // at this time, the outputs to the step function are created
  6. // add the outputs for the final solution
  7. List<RecordWriter<?>> finalOutputWriters = new ArrayList<RecordWriter<?>>();
  8. final TaskConfig finalOutConfig = this.config.getIterationHeadFinalOutputConfig();
  9. final ClassLoader userCodeClassLoader = getUserCodeClassLoader();
  10. this.finalOutputCollector = BatchTask.getOutputCollector(this, finalOutConfig,
  11. userCodeClassLoader, finalOutputWriters, config.getNumOutputs(), finalOutConfig.getNumOutputs());
  12. // sanity check the setup
  13. final int writersIntoStepFunction = this.eventualOutputs.size();
  14. final int writersIntoFinalResult = finalOutputWriters.size();
  15. final int syncGateIndex = this.config.getIterationHeadIndexOfSyncOutput();
  16. if (writersIntoStepFunction + writersIntoFinalResult != syncGateIndex) {
  17. throw new Exception("Error: Inconsistent head task setup - wrong mapping of output gates.");
  18. }
  19. // now, we can instantiate the sync gate
  20. this.toSync = new RecordWriter<IOReadableWritable>(getEnvironment().getWriter(syncGateIndex));
  21. this.toSyncPartitionId = getEnvironment().getWriter(syncGateIndex).getPartitionId();
  22. }

代码示例来源:origin: com.alibaba.blink/flink-runtime

  1. @Override
  2. protected void initOutputs() throws Exception {
  3. // initialize the regular outputs first (the ones into the step function).
  4. super.initOutputs();
  5. // at this time, the outputs to the step function are created
  6. // add the outputs for the final solution
  7. List<RecordWriter<?>> finalOutputWriters = new ArrayList<RecordWriter<?>>();
  8. final TaskConfig finalOutConfig = this.config.getIterationHeadFinalOutputConfig();
  9. final ClassLoader userCodeClassLoader = getUserCodeClassLoader();
  10. this.finalOutputCollector = BatchTask.getOutputCollector(this, finalOutConfig,
  11. userCodeClassLoader, finalOutputWriters, config.getNumOutputs(), finalOutConfig.getNumOutputs());
  12. // sanity check the setup
  13. final int writersIntoStepFunction = this.eventualOutputs.size();
  14. final int writersIntoFinalResult = finalOutputWriters.size();
  15. final int syncGateIndex = this.config.getIterationHeadIndexOfSyncOutput();
  16. if (writersIntoStepFunction + writersIntoFinalResult != syncGateIndex) {
  17. throw new Exception("Error: Inconsistent head task setup - wrong mapping of output gates.");
  18. }
  19. // now, we can instantiate the sync gate
  20. this.toSync = new RecordWriter<>(getEnvironment().getWriter(syncGateIndex));
  21. this.toSyncPartitionId = getEnvironment().getWriter(syncGateIndex).getPartitionId();
  22. }

代码示例来源:origin: org.apache.flink/flink-streaming-java_2.11

  1. private static <OUT> StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>> createStreamRecordWriter(
  2. StreamEdge edge,
  3. int outputIndex,
  4. Environment environment,
  5. String taskName,
  6. long bufferTimeout) {
  7. @SuppressWarnings("unchecked")
  8. StreamPartitioner<OUT> outputPartitioner = (StreamPartitioner<OUT>) edge.getPartitioner();
  9. LOG.debug("Using partitioner {} for output {} of task {}", outputPartitioner, outputIndex, taskName);
  10. ResultPartitionWriter bufferWriter = environment.getWriter(outputIndex);
  11. // we initialize the partitioner here with the number of key groups (aka max. parallelism)
  12. if (outputPartitioner instanceof ConfigurableStreamPartitioner) {
  13. int numKeyGroups = bufferWriter.getNumTargetKeyGroups();
  14. if (0 < numKeyGroups) {
  15. ((ConfigurableStreamPartitioner) outputPartitioner).configure(numKeyGroups);
  16. }
  17. }
  18. StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>> output =
  19. new StreamRecordWriter<>(bufferWriter, outputPartitioner, bufferTimeout, taskName);
  20. output.setMetricGroup(environment.getMetricGroup().getIOMetricGroup());
  21. return output;
  22. }
  23. }

代码示例来源:origin: org.apache.flink/flink-streaming-java

  1. private static <OUT> StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>> createStreamRecordWriter(
  2. StreamEdge edge,
  3. int outputIndex,
  4. Environment environment,
  5. String taskName,
  6. long bufferTimeout) {
  7. @SuppressWarnings("unchecked")
  8. StreamPartitioner<OUT> outputPartitioner = (StreamPartitioner<OUT>) edge.getPartitioner();
  9. LOG.debug("Using partitioner {} for output {} of task {}", outputPartitioner, outputIndex, taskName);
  10. ResultPartitionWriter bufferWriter = environment.getWriter(outputIndex);
  11. // we initialize the partitioner here with the number of key groups (aka max. parallelism)
  12. if (outputPartitioner instanceof ConfigurableStreamPartitioner) {
  13. int numKeyGroups = bufferWriter.getNumTargetKeyGroups();
  14. if (0 < numKeyGroups) {
  15. ((ConfigurableStreamPartitioner) outputPartitioner).configure(numKeyGroups);
  16. }
  17. }
  18. StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>> output =
  19. new StreamRecordWriter<>(bufferWriter, outputPartitioner, bufferTimeout, taskName);
  20. output.setMetricGroup(environment.getMetricGroup().getIOMetricGroup());
  21. return output;
  22. }
  23. }

代码示例来源:origin: com.alibaba.blink/flink-runtime

  1. task.getEnvironment().getWriter(outputOffset + i).setTypeSerializer(serializerFactory.getSerializer());
  2. task.getEnvironment().getWriter(outputOffset + i).setParentTask(task);
  3. new RecordWriter<T>(task.getEnvironment().getWriter(outputOffset + i),
  4. oe, strategy == ShipStrategyType.BROADCAST);

代码示例来源:origin: org.apache.flink/flink-runtime

  1. new RecordWriter<SerializationDelegate<T>>(task.getEnvironment().getWriter(outputOffset + i), oe);

代码示例来源:origin: org.apache.flink/flink-runtime_2.10

  1. new RecordWriter<SerializationDelegate<T>>(task.getEnvironment().getWriter(outputOffset + i), oe);

代码示例来源:origin: org.apache.flink/flink-runtime_2.11

  1. new RecordWriter<SerializationDelegate<T>>(task.getEnvironment().getWriter(outputOffset + i), oe);

代码示例来源:origin: org.apache.flink/flink-streaming-java_2.10

  1. private <T> RecordWriterOutput<T> createStreamOutput(
  2. StreamEdge edge, StreamConfig upStreamConfig, int outputIndex,
  3. Environment taskEnvironment,
  4. String taskName) {
  5. OutputTag sideOutputTag = edge.getOutputTag(); // OutputTag, return null if not sideOutput
  6. TypeSerializer outSerializer = null;
  7. if (edge.getOutputTag() != null) {
  8. // side output
  9. outSerializer = upStreamConfig.getTypeSerializerSideOut(
  10. edge.getOutputTag(), taskEnvironment.getUserClassLoader());
  11. } else {
  12. // main output
  13. outSerializer = upStreamConfig.getTypeSerializerOut(taskEnvironment.getUserClassLoader());
  14. }
  15. @SuppressWarnings("unchecked")
  16. StreamPartitioner<T> outputPartitioner = (StreamPartitioner<T>) edge.getPartitioner();
  17. LOG.debug("Using partitioner {} for output {} of task ", outputPartitioner, outputIndex, taskName);
  18. ResultPartitionWriter bufferWriter = taskEnvironment.getWriter(outputIndex);
  19. // we initialize the partitioner here with the number of key groups (aka max. parallelism)
  20. if (outputPartitioner instanceof ConfigurableStreamPartitioner) {
  21. int numKeyGroups = bufferWriter.getNumTargetKeyGroups();
  22. if (0 < numKeyGroups) {
  23. ((ConfigurableStreamPartitioner) outputPartitioner).configure(numKeyGroups);
  24. }
  25. }
  26. StreamRecordWriter<SerializationDelegate<StreamRecord<T>>> output =
  27. new StreamRecordWriter<>(bufferWriter, outputPartitioner, upStreamConfig.getBufferTimeout());
  28. output.setMetricGroup(taskEnvironment.getMetricGroup().getIOMetricGroup());
  29. return new RecordWriterOutput<>(output, outSerializer, sideOutputTag, this);
  30. }

相关文章