本文整理了Java中org.apache.flink.runtime.execution.Environment.getUserClassLoader()
方法的一些代码示例,展示了Environment.getUserClassLoader()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Environment.getUserClassLoader()
方法的具体详情如下:
包路径:org.apache.flink.runtime.execution.Environment
类名称:Environment
方法名:getUserClassLoader
[英]Returns the user code class loader
[中]返回用户代码类加载器
代码示例来源:origin: apache/flink
@Override
public OperatorStateBackend createOperatorStateBackend(
Environment env,
String operatorIdentifier) throws Exception {
//the default for RocksDB; eventually there can be a operator state backend based on RocksDB, too.
final boolean asyncSnapshots = true;
return new DefaultOperatorStateBackend(
env.getUserClassLoader(),
env.getExecutionConfig(),
asyncSnapshots);
}
代码示例来源:origin: apache/flink
@VisibleForTesting
public static <OUT> List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> createRecordWriters(
StreamConfig configuration,
Environment environment) {
List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> recordWriters = new ArrayList<>();
List<StreamEdge> outEdgesInOrder = configuration.getOutEdgesInOrder(environment.getUserClassLoader());
Map<Integer, StreamConfig> chainedConfigs = configuration.getTransitiveChainedTaskConfigsWithSelf(environment.getUserClassLoader());
for (int i = 0; i < outEdgesInOrder.size(); i++) {
StreamEdge edge = outEdgesInOrder.get(i);
recordWriters.add(
createRecordWriter(
edge,
i,
environment,
environment.getTaskInfo().getTaskName(),
chainedConfigs.get(edge.getSourceId()).getBufferTimeout()));
}
return recordWriters;
}
代码示例来源:origin: apache/flink
@Override
public OperatorStateBackend createOperatorStateBackend(Environment env, String operatorIdentifier) throws Exception {
return new DefaultOperatorStateBackend(
env.getUserClassLoader(),
env.getExecutionConfig(),
true) {
@Nonnull
@Override
public RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot(
long checkpointId,
long timestamp,
@Nonnull CheckpointStreamFactory streamFactory,
@Nonnull CheckpointOptions checkpointOptions) throws Exception {
throw new Exception("Sync part snapshot exception.");
}
};
}
代码示例来源:origin: apache/flink
private RecordWriterOutput<OUT> createStreamOutput(
RecordWriter<SerializationDelegate<StreamRecord<OUT>>> recordWriter,
StreamEdge edge,
StreamConfig upStreamConfig,
Environment taskEnvironment) {
OutputTag sideOutputTag = edge.getOutputTag(); // OutputTag, return null if not sideOutput
TypeSerializer outSerializer = null;
if (edge.getOutputTag() != null) {
// side output
outSerializer = upStreamConfig.getTypeSerializerSideOut(
edge.getOutputTag(), taskEnvironment.getUserClassLoader());
} else {
// main output
outSerializer = upStreamConfig.getTypeSerializerOut(taskEnvironment.getUserClassLoader());
}
return new RecordWriterOutput<>(recordWriter, outSerializer, sideOutputTag, this);
}
代码示例来源:origin: apache/flink
@Override
public OperatorStateBackend createOperatorStateBackend(Environment env, String operatorIdentifier) throws Exception {
return new DefaultOperatorStateBackend(
env.getUserClassLoader(),
env.getExecutionConfig(),
true) {
@Nonnull
@Override
public RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot(
long checkpointId,
long timestamp,
@Nonnull CheckpointStreamFactory streamFactory,
@Nonnull CheckpointOptions checkpointOptions) throws Exception {
return new FutureTask<>(() -> {
throw new Exception("Async part snapshot exception.");
});
}
};
}
代码示例来源:origin: apache/flink
public StreamingRuntimeContext(AbstractStreamOperator<?> operator,
Environment env, Map<String, Accumulator<?, ?>> accumulators) {
super(env.getTaskInfo(),
env.getUserClassLoader(),
operator.getExecutionConfig(),
accumulators,
env.getDistributedCacheEntries(),
operator.getMetricGroup());
this.operator = operator;
this.taskEnvironment = env;
this.streamConfig = new StreamConfig(env.getTaskConfiguration());
this.operatorUniqueID = operator.getOperatorID().toString();
}
代码示例来源:origin: apache/flink
env.getUserClassLoader(),
instanceBasePath,
getDbOptions(),
代码示例来源:origin: apache/flink
protected <K> InternalTimeServiceManager<K> internalTimeServiceManager(
AbstractKeyedStateBackend<K> keyedStatedBackend,
KeyContext keyContext, //the operator
Iterable<KeyGroupStatePartitionStreamProvider> rawKeyedStates) throws Exception {
if (keyedStatedBackend == null) {
return null;
}
final KeyGroupRange keyGroupRange = keyedStatedBackend.getKeyGroupRange();
final InternalTimeServiceManager<K> timeServiceManager = new InternalTimeServiceManager<>(
keyGroupRange,
keyContext,
keyedStatedBackend,
processingTimeService,
keyedStatedBackend.requiresLegacySynchronousTimerSnapshots());
// and then initialize the timer services
for (KeyGroupStatePartitionStreamProvider streamProvider : rawKeyedStates) {
int keyGroupIdx = streamProvider.getKeyGroupId();
Preconditions.checkArgument(keyGroupRange.contains(keyGroupIdx),
"Key Group " + keyGroupIdx + " does not belong to the local range.");
timeServiceManager.restoreStateForKeyGroup(
streamProvider.getStream(),
keyGroupIdx, environment.getUserClassLoader());
}
return timeServiceManager;
}
代码示例来源:origin: org.apache.flink/flink-runtime_2.11
/**
* Returns the user code class loader of this invokable.
*
* @return user code class loader of this invokable.
*/
public ClassLoader getUserCodeClassLoader() {
return getEnvironment().getUserClassLoader();
}
代码示例来源:origin: org.apache.flink/flink-runtime
/**
* Returns the user code class loader of this invokable.
*
* @return user code class loader of this invokable.
*/
public ClassLoader getUserCodeClassLoader() {
return getEnvironment().getUserClassLoader();
}
代码示例来源:origin: com.alibaba.blink/flink-runtime
/**
* Returns the user code class loader of this invokable.
*
* @return user code class loader of this invokable.
*/
public ClassLoader getUserCodeClassLoader() {
return getEnvironment().getUserClassLoader();
}
代码示例来源:origin: org.apache.flink/flink-runtime_2.10
/**
* Returns the user code class loader of this invokable.
*
* @return user code class loader of this invokable.
*/
public ClassLoader getUserCodeClassLoader() {
return getEnvironment().getUserClassLoader();
}
代码示例来源:origin: org.apache.flink/flink-runtime_2.10
@Override
public OperatorStateBackend createOperatorStateBackend(
Environment env,
String operatorIdentifier) throws Exception {
return new DefaultOperatorStateBackend(
env.getUserClassLoader(),
env.getExecutionConfig(),
asynchronousSnapshots);
}
代码示例来源:origin: org.apache.flink/flink-statebackend-rocksdb_2.10
@Override
public OperatorStateBackend createOperatorStateBackend(
Environment env,
String operatorIdentifier) throws Exception {
//the default for RocksDB; eventually there can be a operator state backend based on RocksDB, too.
final boolean asyncSnapshots = true;
return new DefaultOperatorStateBackend(
env.getUserClassLoader(),
env.getExecutionConfig(),
asyncSnapshots);
}
代码示例来源:origin: org.apache.flink/flink-statebackend-rocksdb
@Override
public OperatorStateBackend createOperatorStateBackend(
Environment env,
String operatorIdentifier) throws Exception {
//the default for RocksDB; eventually there can be a operator state backend based on RocksDB, too.
final boolean asyncSnapshots = true;
return new DefaultOperatorStateBackend(
env.getUserClassLoader(),
env.getExecutionConfig(),
asyncSnapshots);
}
代码示例来源:origin: org.apache.flink/flink-runtime_2.10
@Override
public OperatorStateBackend createOperatorStateBackend(
Environment env,
String operatorIdentifier) throws Exception {
return new DefaultOperatorStateBackend(
env.getUserClassLoader(),
env.getExecutionConfig(),
asynchronousSnapshots);
}
代码示例来源:origin: org.apache.flink/flink-runtime_2.11
@Override
public OperatorStateBackend createOperatorStateBackend(
Environment env,
String operatorIdentifier) {
return new DefaultOperatorStateBackend(
env.getUserClassLoader(),
env.getExecutionConfig(),
isUsingAsynchronousSnapshots());
}
代码示例来源:origin: com.alibaba.blink/flink-runtime
@Override
public OperatorStateBackend createOperatorStateBackend(
Environment env,
String operatorIdentifier) {
return new DefaultOperatorStateBackend(
env.getUserClassLoader(),
env.getExecutionConfig(),
isUsingAsynchronousSnapshots());
}
代码示例来源:origin: org.apache.flink/flink-runtime_2.11
@Override
public OperatorStateBackend createOperatorStateBackend(
Environment env,
String operatorIdentifier) throws Exception {
return new DefaultOperatorStateBackend(
env.getUserClassLoader(),
env.getExecutionConfig(),
isUsingAsynchronousSnapshots());
}
代码示例来源:origin: org.apache.flink/flink-streaming-java_2.10
public StreamingRuntimeContext(AbstractStreamOperator<?> operator,
Environment env, Map<String, Accumulator<?, ?>> accumulators) {
super(env.getTaskInfo(),
env.getUserClassLoader(),
operator.getExecutionConfig(),
accumulators,
env.getDistributedCacheEntries(),
operator.getMetricGroup());
this.operator = operator;
this.taskEnvironment = env;
this.streamConfig = new StreamConfig(env.getTaskConfiguration());
}
内容来源于网络,如有侵权,请联系作者删除!