本文整理了Java中org.apache.flink.runtime.execution.Environment.getWriter()
方法的一些代码示例,展示了Environment.getWriter()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Environment.getWriter()
方法的具体详情如下:
包路径:org.apache.flink.runtime.execution.Environment
类名称:Environment
方法名:getWriter
暂无
代码示例来源:origin: apache/flink
public MockRecordWriter(DataSourceTask<?> inputBase, Class<StreamRecord<Tuple1<Integer>>> outputClass) {
super(inputBase.getEnvironment().getWriter(0));
}
代码示例来源:origin: apache/flink
private static <OUT> RecordWriter<SerializationDelegate<StreamRecord<OUT>>> createRecordWriter(
StreamEdge edge,
int outputIndex,
Environment environment,
String taskName,
long bufferTimeout) {
@SuppressWarnings("unchecked")
StreamPartitioner<OUT> outputPartitioner = (StreamPartitioner<OUT>) edge.getPartitioner();
LOG.debug("Using partitioner {} for output {} of task {}", outputPartitioner, outputIndex, taskName);
ResultPartitionWriter bufferWriter = environment.getWriter(outputIndex);
// we initialize the partitioner here with the number of key groups (aka max. parallelism)
if (outputPartitioner instanceof ConfigurableStreamPartitioner) {
int numKeyGroups = bufferWriter.getNumTargetKeyGroups();
if (0 < numKeyGroups) {
((ConfigurableStreamPartitioner) outputPartitioner).configure(numKeyGroups);
}
}
RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output =
RecordWriter.createRecordWriter(bufferWriter, outputPartitioner, bufferTimeout, taskName);
output.setMetricGroup(environment.getMetricGroup().getIOMetricGroup());
return output;
}
}
代码示例来源:origin: apache/flink
@Override
public void invoke() throws Exception {
RecordWriter<SpeedTestRecord> writer = new RecordWriter<>(getEnvironment().getWriter(0));
try {
// Determine the amount of data to send per subtask
int dataVolumeGb = getTaskConfiguration().getInteger(NetworkStackThroughputITCase.DATA_VOLUME_GB_CONFIG_KEY, 1);
long dataMbPerSubtask = (dataVolumeGb * 10) / getCurrentNumberOfSubtasks();
long numRecordsToEmit = (dataMbPerSubtask * 1024 * 1024) / SpeedTestRecord.RECORD_SIZE;
LOG.info(String.format("%d/%d: Producing %d records (each record: %d bytes, total: %.2f GB)",
getIndexInSubtaskGroup() + 1, getCurrentNumberOfSubtasks(), numRecordsToEmit,
SpeedTestRecord.RECORD_SIZE, dataMbPerSubtask / 1024.0));
boolean isSlow = getTaskConfiguration().getBoolean(IS_SLOW_SENDER_CONFIG_KEY, false);
int numRecords = 0;
SpeedTestRecord record = new SpeedTestRecord();
for (long i = 0; i < numRecordsToEmit; i++) {
if (isSlow && (numRecords++ % IS_SLOW_EVERY_NUM_RECORDS) == 0) {
Thread.sleep(IS_SLOW_SLEEP_MS);
}
writer.emit(record);
}
}
finally {
writer.clearBuffers();
writer.flushAll();
}
}
}
代码示例来源:origin: apache/flink
@Override
public void invoke() throws Exception {
RecordReader<SpeedTestRecord> reader = new RecordReader<>(
getEnvironment().getInputGate(0),
SpeedTestRecord.class,
getEnvironment().getTaskManagerInfo().getTmpDirectories());
RecordWriter<SpeedTestRecord> writer = new RecordWriter<>(getEnvironment().getWriter(0));
try {
SpeedTestRecord record;
while ((record = reader.next()) != null) {
writer.emit(record);
}
}
finally {
reader.clearBuffers();
writer.clearBuffers();
writer.flushAll();
}
}
}
代码示例来源:origin: org.apache.flink/flink-runtime
@Override
protected void initOutputs() throws Exception {
// initialize the regular outputs first (the ones into the step function).
super.initOutputs();
// at this time, the outputs to the step function are created
// add the outputs for the final solution
List<RecordWriter<?>> finalOutputWriters = new ArrayList<RecordWriter<?>>();
final TaskConfig finalOutConfig = this.config.getIterationHeadFinalOutputConfig();
final ClassLoader userCodeClassLoader = getUserCodeClassLoader();
this.finalOutputCollector = BatchTask.getOutputCollector(this, finalOutConfig,
userCodeClassLoader, finalOutputWriters, config.getNumOutputs(), finalOutConfig.getNumOutputs());
// sanity check the setup
final int writersIntoStepFunction = this.eventualOutputs.size();
final int writersIntoFinalResult = finalOutputWriters.size();
final int syncGateIndex = this.config.getIterationHeadIndexOfSyncOutput();
if (writersIntoStepFunction + writersIntoFinalResult != syncGateIndex) {
throw new Exception("Error: Inconsistent head task setup - wrong mapping of output gates.");
}
// now, we can instantiate the sync gate
this.toSync = new RecordWriter<IOReadableWritable>(getEnvironment().getWriter(syncGateIndex));
this.toSyncPartitionId = getEnvironment().getWriter(syncGateIndex).getPartitionId();
}
代码示例来源:origin: org.apache.flink/flink-runtime_2.10
@Override
protected void initOutputs() throws Exception {
// initialize the regular outputs first (the ones into the step function).
super.initOutputs();
// at this time, the outputs to the step function are created
// add the outputs for the final solution
List<RecordWriter<?>> finalOutputWriters = new ArrayList<RecordWriter<?>>();
final TaskConfig finalOutConfig = this.config.getIterationHeadFinalOutputConfig();
final ClassLoader userCodeClassLoader = getUserCodeClassLoader();
this.finalOutputCollector = BatchTask.getOutputCollector(this, finalOutConfig,
userCodeClassLoader, finalOutputWriters, config.getNumOutputs(), finalOutConfig.getNumOutputs());
// sanity check the setup
final int writersIntoStepFunction = this.eventualOutputs.size();
final int writersIntoFinalResult = finalOutputWriters.size();
final int syncGateIndex = this.config.getIterationHeadIndexOfSyncOutput();
if (writersIntoStepFunction + writersIntoFinalResult != syncGateIndex) {
throw new Exception("Error: Inconsistent head task setup - wrong mapping of output gates.");
}
// now, we can instantiate the sync gate
this.toSync = getEnvironment().getWriter(syncGateIndex);
}
代码示例来源:origin: org.apache.flink/flink-runtime_2.11
@Override
protected void initOutputs() throws Exception {
// initialize the regular outputs first (the ones into the step function).
super.initOutputs();
// at this time, the outputs to the step function are created
// add the outputs for the final solution
List<RecordWriter<?>> finalOutputWriters = new ArrayList<RecordWriter<?>>();
final TaskConfig finalOutConfig = this.config.getIterationHeadFinalOutputConfig();
final ClassLoader userCodeClassLoader = getUserCodeClassLoader();
this.finalOutputCollector = BatchTask.getOutputCollector(this, finalOutConfig,
userCodeClassLoader, finalOutputWriters, config.getNumOutputs(), finalOutConfig.getNumOutputs());
// sanity check the setup
final int writersIntoStepFunction = this.eventualOutputs.size();
final int writersIntoFinalResult = finalOutputWriters.size();
final int syncGateIndex = this.config.getIterationHeadIndexOfSyncOutput();
if (writersIntoStepFunction + writersIntoFinalResult != syncGateIndex) {
throw new Exception("Error: Inconsistent head task setup - wrong mapping of output gates.");
}
// now, we can instantiate the sync gate
this.toSync = new RecordWriter<IOReadableWritable>(getEnvironment().getWriter(syncGateIndex));
this.toSyncPartitionId = getEnvironment().getWriter(syncGateIndex).getPartitionId();
}
代码示例来源:origin: com.alibaba.blink/flink-runtime
@Override
protected void initOutputs() throws Exception {
// initialize the regular outputs first (the ones into the step function).
super.initOutputs();
// at this time, the outputs to the step function are created
// add the outputs for the final solution
List<RecordWriter<?>> finalOutputWriters = new ArrayList<RecordWriter<?>>();
final TaskConfig finalOutConfig = this.config.getIterationHeadFinalOutputConfig();
final ClassLoader userCodeClassLoader = getUserCodeClassLoader();
this.finalOutputCollector = BatchTask.getOutputCollector(this, finalOutConfig,
userCodeClassLoader, finalOutputWriters, config.getNumOutputs(), finalOutConfig.getNumOutputs());
// sanity check the setup
final int writersIntoStepFunction = this.eventualOutputs.size();
final int writersIntoFinalResult = finalOutputWriters.size();
final int syncGateIndex = this.config.getIterationHeadIndexOfSyncOutput();
if (writersIntoStepFunction + writersIntoFinalResult != syncGateIndex) {
throw new Exception("Error: Inconsistent head task setup - wrong mapping of output gates.");
}
// now, we can instantiate the sync gate
this.toSync = new RecordWriter<>(getEnvironment().getWriter(syncGateIndex));
this.toSyncPartitionId = getEnvironment().getWriter(syncGateIndex).getPartitionId();
}
代码示例来源:origin: org.apache.flink/flink-streaming-java_2.11
private static <OUT> StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>> createStreamRecordWriter(
StreamEdge edge,
int outputIndex,
Environment environment,
String taskName,
long bufferTimeout) {
@SuppressWarnings("unchecked")
StreamPartitioner<OUT> outputPartitioner = (StreamPartitioner<OUT>) edge.getPartitioner();
LOG.debug("Using partitioner {} for output {} of task {}", outputPartitioner, outputIndex, taskName);
ResultPartitionWriter bufferWriter = environment.getWriter(outputIndex);
// we initialize the partitioner here with the number of key groups (aka max. parallelism)
if (outputPartitioner instanceof ConfigurableStreamPartitioner) {
int numKeyGroups = bufferWriter.getNumTargetKeyGroups();
if (0 < numKeyGroups) {
((ConfigurableStreamPartitioner) outputPartitioner).configure(numKeyGroups);
}
}
StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>> output =
new StreamRecordWriter<>(bufferWriter, outputPartitioner, bufferTimeout, taskName);
output.setMetricGroup(environment.getMetricGroup().getIOMetricGroup());
return output;
}
}
代码示例来源:origin: org.apache.flink/flink-streaming-java
private static <OUT> StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>> createStreamRecordWriter(
StreamEdge edge,
int outputIndex,
Environment environment,
String taskName,
long bufferTimeout) {
@SuppressWarnings("unchecked")
StreamPartitioner<OUT> outputPartitioner = (StreamPartitioner<OUT>) edge.getPartitioner();
LOG.debug("Using partitioner {} for output {} of task {}", outputPartitioner, outputIndex, taskName);
ResultPartitionWriter bufferWriter = environment.getWriter(outputIndex);
// we initialize the partitioner here with the number of key groups (aka max. parallelism)
if (outputPartitioner instanceof ConfigurableStreamPartitioner) {
int numKeyGroups = bufferWriter.getNumTargetKeyGroups();
if (0 < numKeyGroups) {
((ConfigurableStreamPartitioner) outputPartitioner).configure(numKeyGroups);
}
}
StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>> output =
new StreamRecordWriter<>(bufferWriter, outputPartitioner, bufferTimeout, taskName);
output.setMetricGroup(environment.getMetricGroup().getIOMetricGroup());
return output;
}
}
代码示例来源:origin: com.alibaba.blink/flink-runtime
task.getEnvironment().getWriter(outputOffset + i).setTypeSerializer(serializerFactory.getSerializer());
task.getEnvironment().getWriter(outputOffset + i).setParentTask(task);
new RecordWriter<T>(task.getEnvironment().getWriter(outputOffset + i),
oe, strategy == ShipStrategyType.BROADCAST);
代码示例来源:origin: org.apache.flink/flink-runtime
new RecordWriter<SerializationDelegate<T>>(task.getEnvironment().getWriter(outputOffset + i), oe);
代码示例来源:origin: org.apache.flink/flink-runtime_2.10
new RecordWriter<SerializationDelegate<T>>(task.getEnvironment().getWriter(outputOffset + i), oe);
代码示例来源:origin: org.apache.flink/flink-runtime_2.11
new RecordWriter<SerializationDelegate<T>>(task.getEnvironment().getWriter(outputOffset + i), oe);
代码示例来源:origin: org.apache.flink/flink-streaming-java_2.10
private <T> RecordWriterOutput<T> createStreamOutput(
StreamEdge edge, StreamConfig upStreamConfig, int outputIndex,
Environment taskEnvironment,
String taskName) {
OutputTag sideOutputTag = edge.getOutputTag(); // OutputTag, return null if not sideOutput
TypeSerializer outSerializer = null;
if (edge.getOutputTag() != null) {
// side output
outSerializer = upStreamConfig.getTypeSerializerSideOut(
edge.getOutputTag(), taskEnvironment.getUserClassLoader());
} else {
// main output
outSerializer = upStreamConfig.getTypeSerializerOut(taskEnvironment.getUserClassLoader());
}
@SuppressWarnings("unchecked")
StreamPartitioner<T> outputPartitioner = (StreamPartitioner<T>) edge.getPartitioner();
LOG.debug("Using partitioner {} for output {} of task ", outputPartitioner, outputIndex, taskName);
ResultPartitionWriter bufferWriter = taskEnvironment.getWriter(outputIndex);
// we initialize the partitioner here with the number of key groups (aka max. parallelism)
if (outputPartitioner instanceof ConfigurableStreamPartitioner) {
int numKeyGroups = bufferWriter.getNumTargetKeyGroups();
if (0 < numKeyGroups) {
((ConfigurableStreamPartitioner) outputPartitioner).configure(numKeyGroups);
}
}
StreamRecordWriter<SerializationDelegate<StreamRecord<T>>> output =
new StreamRecordWriter<>(bufferWriter, outputPartitioner, upStreamConfig.getBufferTimeout());
output.setMetricGroup(taskEnvironment.getMetricGroup().getIOMetricGroup());
return new RecordWriterOutput<>(output, outSerializer, sideOutputTag, this);
}
内容来源于网络,如有侵权,请联系作者删除!