本文整理了Java中org.apache.flink.runtime.execution.Environment
类的一些代码示例,展示了Environment
类的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Environment
类的具体详情如下:
包路径:org.apache.flink.runtime.execution.Environment
类名称:Environment
[英]The Environment gives the code executed in a task access to the task's properties (such as name, parallelism), the configurations, the data stream readers and writers, as well as the various components that are provided by the TaskManager, such as memory manager, I/O manager, ...
[中]环境允许任务中执行的代码访问任务的属性(如名称、并行性)、配置、数据流读写器以及TaskManager提供的各种组件,如内存管理器、I/O管理器等。。。
代码示例来源:origin: apache/flink
@Override
public void init() throws Exception {
StreamConfig configuration = getConfiguration();
TypeSerializer<IN> inSerializer = configuration.getTypeSerializerIn1(getUserCodeClassLoader());
int numberOfInputs = configuration.getNumberOfInputs();
if (numberOfInputs > 0) {
InputGate[] inputGates = getEnvironment().getAllInputGates();
inputProcessor = new StreamInputProcessor<>(
inputGates,
inSerializer,
this,
configuration.getCheckpointMode(),
getCheckpointLock(),
getEnvironment().getIOManager(),
getEnvironment().getTaskManagerInfo().getConfiguration(),
getStreamStatusMaintainer(),
this.headOperator,
getEnvironment().getMetricGroup().getIOMetricGroup(),
inputWatermarkGauge);
}
headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge);
// wrap watermark gauge since registered metrics must be unique
getEnvironment().getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge::getValue);
}
代码示例来源:origin: apache/flink
public StreamingRuntimeContext(AbstractStreamOperator<?> operator,
Environment env, Map<String, Accumulator<?, ?>> accumulators) {
super(env.getTaskInfo(),
env.getUserClassLoader(),
operator.getExecutionConfig(),
accumulators,
env.getDistributedCacheEntries(),
operator.getMetricGroup());
this.operator = operator;
this.taskEnvironment = env;
this.streamConfig = new StreamConfig(env.getTaskConfiguration());
this.operatorUniqueID = operator.getOperatorID().toString();
}
代码示例来源:origin: apache/flink
@Override
public void init() throws Exception {
final String iterationId = getConfiguration().getIterationId();
if (iterationId == null || iterationId.length() == 0) {
throw new Exception("Missing iteration ID in the task configuration");
}
final String brokerID = StreamIterationHead.createBrokerIdString(getEnvironment().getJobID(), iterationId,
getEnvironment().getTaskInfo().getIndexOfThisSubtask());
final long iterationWaitTime = getConfiguration().getIterationWaitTime();
LOG.info("Iteration tail {} trying to acquire feedback queue under {}", getName(), brokerID);
@SuppressWarnings("unchecked")
BlockingQueue<StreamRecord<IN>> dataChannel =
(BlockingQueue<StreamRecord<IN>>) BlockingQueueBroker.INSTANCE.get(brokerID);
LOG.info("Iteration tail {} acquired feedback queue {}", getName(), brokerID);
this.headOperator = new RecordPusher<>();
this.headOperator.setup(this, getConfiguration(), new IterationTailOutput<>(dataChannel, iterationWaitTime));
// call super.init() last because that needs this.headOperator to be set up
super.init();
}
代码示例来源:origin: apache/flink
@Override
public OperatorStateBackend createOperatorStateBackend(
Environment env,
String operatorIdentifier) throws Exception {
//the default for RocksDB; eventually there can be a operator state backend based on RocksDB, too.
final boolean asyncSnapshots = true;
return new DefaultOperatorStateBackend(
env.getUserClassLoader(),
env.getExecutionConfig(),
asyncSnapshots);
}
代码示例来源:origin: apache/flink
this.jobId = env.getJobID();
initializedDbBasePaths = env.getIOManager().getSpillingDirectories();
代码示例来源:origin: org.apache.flink/flink-runtime_2.11
public DistributedRuntimeUDFContext createRuntimeContext() {
Environment env = getEnvironment();
String sourceName = getEnvironment().getTaskInfo().getTaskName().split("->")[0].trim();
sourceName = sourceName.startsWith("CHAIN") ? sourceName.substring(6) : sourceName;
return new DistributedRuntimeUDFContext(env.getTaskInfo(), getUserCodeClassLoader(),
getExecutionConfig(), env.getDistributedCacheEntries(), env.getAccumulatorRegistry().getUserMap(),
getEnvironment().getMetricGroup().getOrAddOperator(sourceName));
}
}
代码示例来源:origin: apache/flink
InputGate reader = getEnvironment().getInputGate(i);
switch (inputType) {
case 1:
configuration.getCheckpointMode(),
getCheckpointLock(),
getEnvironment().getIOManager(),
getEnvironment().getTaskManagerInfo().getConfiguration(),
getStreamStatusMaintainer(),
this.headOperator,
getEnvironment().getMetricGroup().getIOMetricGroup(),
input1WatermarkGauge,
input2WatermarkGauge);
headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_2_WATERMARK, input2WatermarkGauge);
getEnvironment().getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, minInputWatermarkGauge::getValue);
代码示例来源:origin: com.alibaba.blink/flink-runtime
getEnvironment().getInputGate(0),
getEnvironment().getTaskManagerInfo().getTmpDirectories(),
getEnvironment().getTaskManagerInfo().getConfiguration());
} else if (groupSize > 1){
new UnionInputGate(getEnvironment().getAllInputGates()),
getEnvironment().getTaskManagerInfo().getTmpDirectories(),
getEnvironment().getTaskManagerInfo().getConfiguration());
} else {
throw new Exception("Illegal input group size in task configuration: " + groupSize);
代码示例来源:origin: apache/flink
String tempDir = env.getTaskManagerInfo().getTmpDirectories()[0];
ensureRocksDBIsLoaded(tempDir);
env.getTaskStateManager().createLocalRecoveryConfig();
env.getUserClassLoader(),
instanceBasePath,
getDbOptions(),
numberOfKeyGroups,
keyGroupRange,
env.getExecutionConfig(),
isIncrementalCheckpointsEnabled(),
getNumberOfTransferingThreads(),
代码示例来源:origin: com.alibaba.blink/flink-runtime
@Override
public AbstractInternalStateBackend createInternalStateBackend(
Environment env,
String operatorIdentifier,
int numberOfGroups,
KeyGroupRange keyGroupRange) {
return new HeapInternalStateBackend(
numberOfGroups,
keyGroupRange,
env.getUserClassLoader(),
env.getTaskStateManager().createLocalRecoveryConfig(),
env.getTaskKvStateRegistry(),
isUsingAsynchronousSnapshots(),
env.getExecutionConfig()
);
}
代码示例来源:origin: apache/flink
@Override
public void invoke() throws Exception {
RecordReader<SpeedTestRecord> reader = new RecordReader<>(
getEnvironment().getInputGate(0),
SpeedTestRecord.class,
getEnvironment().getTaskManagerInfo().getTmpDirectories());
try {
boolean isSlow = getTaskConfiguration().getBoolean(IS_SLOW_RECEIVER_CONFIG_KEY, false);
int numRecords = 0;
while (reader.next() != null) {
if (isSlow && (numRecords++ % IS_SLOW_EVERY_NUM_RECORDS) == 0) {
Thread.sleep(IS_SLOW_SLEEP_MS);
}
}
}
finally {
reader.clearBuffers();
}
}
}
代码示例来源:origin: com.alibaba.blink/flink-runtime
@Override
public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
Environment env,
JobID jobID,
String operatorIdentifier,
TypeSerializer<K> keySerializer,
int numberOfKeyGroups,
KeyGroupRange keyGroupRange,
TaskKvStateRegistry kvStateRegistry) {
TaskStateManager taskStateManager = env.getTaskStateManager();
LocalRecoveryConfig localRecoveryConfig = taskStateManager.createLocalRecoveryConfig();
return new HeapKeyedStateBackend<>(
kvStateRegistry,
keySerializer,
env.getUserClassLoader(),
numberOfKeyGroups,
keyGroupRange,
isUsingAsynchronousSnapshots(),
env.getExecutionConfig(),
localRecoveryConfig);
}
代码示例来源:origin: apache/flink
private static <OUT> RecordWriter<SerializationDelegate<StreamRecord<OUT>>> createRecordWriter(
StreamEdge edge,
int outputIndex,
Environment environment,
String taskName,
long bufferTimeout) {
@SuppressWarnings("unchecked")
StreamPartitioner<OUT> outputPartitioner = (StreamPartitioner<OUT>) edge.getPartitioner();
LOG.debug("Using partitioner {} for output {} of task {}", outputPartitioner, outputIndex, taskName);
ResultPartitionWriter bufferWriter = environment.getWriter(outputIndex);
// we initialize the partitioner here with the number of key groups (aka max. parallelism)
if (outputPartitioner instanceof ConfigurableStreamPartitioner) {
int numKeyGroups = bufferWriter.getNumTargetKeyGroups();
if (0 < numKeyGroups) {
((ConfigurableStreamPartitioner) outputPartitioner).configure(numKeyGroups);
}
}
RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output =
RecordWriter.createRecordWriter(bufferWriter, outputPartitioner, bufferTimeout, taskName);
output.setMetricGroup(environment.getMetricGroup().getIOMetricGroup());
return output;
}
}
代码示例来源:origin: apache/flink
@Override
public void invoke() throws Exception {
RecordReader<SpeedTestRecord> reader = new RecordReader<>(
getEnvironment().getInputGate(0),
SpeedTestRecord.class,
getEnvironment().getTaskManagerInfo().getTmpDirectories());
RecordWriter<SpeedTestRecord> writer = new RecordWriter<>(getEnvironment().getWriter(0));
try {
SpeedTestRecord record;
while ((record = reader.next()) != null) {
writer.emit(record);
}
}
finally {
reader.clearBuffers();
writer.clearBuffers();
writer.flushAll();
}
}
}
代码示例来源:origin: com.alibaba.blink/flink-table
@Override
public void open() throws Exception {
super.open();
LOG.info("Opening SortOperator");
cookGeneratedClasses(getContainingTask().getUserCodeClassLoader());
TypeSerializer<BaseRow> inputSerializer = getOperatorConfig().getTypeSerializerIn1(getUserCodeClassloader());
this.binarySerializer =
new BinaryRowSerializer(((AbstractRowSerializer) inputSerializer).getTypes());
MemoryManager memManager = this.getContainingTask().getEnvironment().getMemoryManager();
IOManager ioManager = this.getContainingTask().getEnvironment().getIOManager();
NormalizedKeyComputer computer = computerClass.newInstance();
RecordComparator comparator = comparatorClass.newInstance();
computer.init(gSorter.serializers(), gSorter.comparators());
comparator.init(gSorter.serializers(), gSorter.comparators());
this.sorter = new BinaryExternalSorter(this.getContainingTask(), memManager, reservedMemorySize,
maxMemorySize, perRequestMemorySize, ioManager, inputSerializer, binarySerializer,
computer, comparator, getSqlConf());
this.sorter.startThreads();
gSorter = null;
collector = new StreamRecordCollector<>(output);
//register the the metrics.
getMetricGroup().gauge("memoryUsedSizeInBytes", (Gauge<Long>) sorter::getUsedMemoryInBytes);
getMetricGroup().gauge("numSpillFiles", (Gauge<Long>) sorter::getNumSpillFiles);
getMetricGroup().gauge("spillInBytes", (Gauge<Long>) sorter::getSpillInBytes);
}
代码示例来源:origin: apache/flink
private void tryShutdownTimerService() {
if (timerService != null && !timerService.isTerminated()) {
try {
final long timeoutMs = getEnvironment().getTaskManagerInfo().getConfiguration().
getLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT_TIMERS);
if (!timerService.shutdownServiceUninterruptible(timeoutMs)) {
LOG.warn("Timer service shutdown exceeded time limit of {} ms while waiting for pending " +
"timers. Will continue with shutdown procedure.", timeoutMs);
}
} catch (Throwable t) {
// catch and log the exception to not replace the original exception
LOG.error("Could not shut down timer service", t);
}
}
}
代码示例来源:origin: org.apache.flink/flink-streaming-java_2.10
private <T> RecordWriterOutput<T> createStreamOutput(
StreamEdge edge, StreamConfig upStreamConfig, int outputIndex,
Environment taskEnvironment,
String taskName) {
OutputTag sideOutputTag = edge.getOutputTag(); // OutputTag, return null if not sideOutput
TypeSerializer outSerializer = null;
if (edge.getOutputTag() != null) {
// side output
outSerializer = upStreamConfig.getTypeSerializerSideOut(
edge.getOutputTag(), taskEnvironment.getUserClassLoader());
} else {
// main output
outSerializer = upStreamConfig.getTypeSerializerOut(taskEnvironment.getUserClassLoader());
}
@SuppressWarnings("unchecked")
StreamPartitioner<T> outputPartitioner = (StreamPartitioner<T>) edge.getPartitioner();
LOG.debug("Using partitioner {} for output {} of task ", outputPartitioner, outputIndex, taskName);
ResultPartitionWriter bufferWriter = taskEnvironment.getWriter(outputIndex);
// we initialize the partitioner here with the number of key groups (aka max. parallelism)
if (outputPartitioner instanceof ConfigurableStreamPartitioner) {
int numKeyGroups = bufferWriter.getNumTargetKeyGroups();
if (0 < numKeyGroups) {
((ConfigurableStreamPartitioner) outputPartitioner).configure(numKeyGroups);
}
}
StreamRecordWriter<SerializationDelegate<StreamRecord<T>>> output =
new StreamRecordWriter<>(bufferWriter, outputPartitioner, upStreamConfig.getBufferTimeout());
output.setMetricGroup(taskEnvironment.getMetricGroup().getIOMetricGroup());
return new RecordWriterOutput<>(output, outSerializer, sideOutputTag, this);
}
代码示例来源:origin: apache/flink
TaskInfo taskInfo = environment.getTaskInfo();
() -> stateBackend.createKeyedStateBackend(
environment,
environment.getJobID(),
operatorIdentifierText,
keySerializer,
taskInfo.getMaxNumberOfParallelSubtasks(),
keyGroupRange,
environment.getTaskKvStateRegistry(),
TtlTimeProvider.DEFAULT,
metricGroup),
代码示例来源:origin: apache/flink
@VisibleForTesting
public static <OUT> List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> createRecordWriters(
StreamConfig configuration,
Environment environment) {
List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> recordWriters = new ArrayList<>();
List<StreamEdge> outEdgesInOrder = configuration.getOutEdgesInOrder(environment.getUserClassLoader());
Map<Integer, StreamConfig> chainedConfigs = configuration.getTransitiveChainedTaskConfigsWithSelf(environment.getUserClassLoader());
for (int i = 0; i < outEdgesInOrder.size(); i++) {
StreamEdge edge = outEdgesInOrder.get(i);
recordWriters.add(
createRecordWriter(
edge,
i,
environment,
environment.getTaskInfo().getTaskName(),
chainedConfigs.get(edge.getSourceId()).getBufferTimeout()));
}
return recordWriters;
}
代码示例来源:origin: com.alibaba.blink/flink-table
IOManager ioManager = getContainingTask().getEnvironment().getIOManager();
getContainingTask().getEnvironment().getTaskConfiguration().getBoolean(
ConfigConstants.RUNTIME_HASH_JOIN_BLOOM_FILTERS_KEY,
ConfigConstants.DEFAULT_RUNTIME_HASH_JOIN_BLOOM_FILTERS);
buildSerializer, probeSerializer,
buildProjectionClass.newInstance(), probeProjectionClass.newInstance(),
getContainingTask().getEnvironment().getMemoryManager(),
parameter.reservedMemorySize,
parameter.maxMemorySize,
内容来源于网络,如有侵权,请联系作者删除!