本文整理了Java中org.apache.flink.configuration.Configuration.addAll()
方法的一些代码示例,展示了Configuration.addAll()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Configuration.addAll()
方法的具体详情如下:
包路径:org.apache.flink.configuration.Configuration
类名称:Configuration
方法名:addAll
[英]Adds all entries from the given configuration into this configuration. The keys are prepended with the given prefix.
[中]将给定配置中的所有条目添加到此配置中。这些键的前缀是给定的。
代码示例来源:origin: apache/flink
@Override
public void addAll(Configuration other, String prefix) {
this.backingConfig.addAll(other, this.prefix + prefix);
}
代码示例来源:origin: apache/flink
public void appendConfiguration(Configuration config) throws IOException {
final Configuration mergedConfig = new Configuration();
mergedConfig.addAll(defaultConfig);
mergedConfig.addAll(config);
final List<String> configurationLines = mergedConfig.toMap().entrySet().stream()
.map(entry -> entry.getKey() + ": " + entry.getValue())
.collect(Collectors.toList());
Files.write(conf.resolve("flink-conf.yaml"), configurationLines);
}
代码示例来源:origin: apache/flink
/**
* Getter which returns a copy of the associated configuration.
*
* @return Copy of the associated configuration
*/
public Configuration getConfiguration() {
Configuration copiedConfiguration = new Configuration();
copiedConfiguration.addAll(configuration);
return copiedConfiguration;
}
代码示例来源:origin: apache/flink
/**
* Specify a custom {@code Configuration} that will be used when creating
* the {@link FileSystem} for writing.
*/
public RollingSink<T> setFSConfig(Configuration config) {
this.fsConfig = new Configuration();
fsConfig.addAll(config);
return this;
}
代码示例来源:origin: apache/flink
@Override
public Configuration clone() {
Configuration config = new Configuration();
config.addAll(this);
return config;
}
代码示例来源:origin: apache/flink
/**
* Specify a custom {@code Configuration} that will be used when creating
* the {@link FileSystem} for writing.
*/
public BucketingSink<T> setFSConfig(Configuration config) {
this.fsConfig = new Configuration();
fsConfig.addAll(config);
return this;
}
代码示例来源:origin: apache/flink
configuration.addAll(dynamicProperties);
代码示例来源:origin: apache/flink
public static ContainerSpecification createContainerSpec(Configuration configuration, Configuration dynamicProperties)
throws Exception {
// generate a container spec which conveys the artifacts/vars needed to launch a TM
ContainerSpecification spec = new ContainerSpecification();
// propagate the AM dynamic configuration to the TM
spec.getDynamicConfiguration().addAll(dynamicProperties);
applyOverlays(configuration, spec);
return spec;
}
代码示例来源:origin: apache/flink
protected GenericDataSourceBase<OUT, ?> translateToDataFlow() {
String name = this.name != null ? this.name : "at " + dataSourceLocationName + " (" + inputFormat.getClass().getName() + ")";
if (name.length() > 150) {
name = name.substring(0, 150);
}
@SuppressWarnings({"unchecked", "rawtypes"})
GenericDataSourceBase<OUT, ?> source = new GenericDataSourceBase(this.inputFormat,
new OperatorInformation<OUT>(getType()), name);
source.setParallelism(parallelism);
if (this.parameters != null) {
source.getParameters().addAll(this.parameters);
}
if (this.splitDataProperties != null) {
source.setSplitDataProperties(this.splitDataProperties);
}
return source;
}
代码示例来源:origin: apache/flink
public void setHeadTask(JobVertex headTask, TaskConfig headConfig) {
this.headTask = headTask;
this.headFinalResultConfig = new TaskConfig(new Configuration());
// check if we already had a configuration, for example if the solution set was
if (this.headConfig != null) {
headConfig.getConfiguration().addAll(this.headConfig.getConfiguration());
}
this.headConfig = headConfig;
}
代码示例来源:origin: apache/flink
private Configuration createConfiguration() {
Configuration newConfiguration = new Configuration();
newConfiguration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, getTaskManagerNumSlots());
newConfiguration.setBoolean(CoreOptions.FILESYTEM_DEFAULT_OVERRIDE, isDefaultOverwriteFiles());
newConfiguration.addAll(baseConfiguration);
return newConfiguration;
}
代码示例来源:origin: apache/flink
private <I, O> org.apache.flink.api.common.operators.Operator<O> translateSingleInputOperator(SingleInputOperator<?, ?, ?> op) {
@SuppressWarnings("unchecked")
SingleInputOperator<I, O, ?> typedOp = (SingleInputOperator<I, O, ?>) op;
@SuppressWarnings("unchecked")
DataSet<I> typedInput = (DataSet<I>) op.getInput();
Operator<I> input = translate(typedInput);
org.apache.flink.api.common.operators.Operator<O> dataFlowOp = typedOp.translateToDataFlow(input);
if (op instanceof UdfOperator<?>) {
@SuppressWarnings("unchecked")
SingleInputUdfOperator<I, O, ?> udfOp = (SingleInputUdfOperator<I, O, ?>) op;
// set configuration parameters
Configuration opParams = udfOp.getParameters();
if (opParams != null) {
dataFlowOp.getParameters().addAll(opParams);
}
if (dataFlowOp instanceof org.apache.flink.api.common.operators.SingleInputOperator) {
org.apache.flink.api.common.operators.SingleInputOperator<?, O, ?> unaryOp =
(org.apache.flink.api.common.operators.SingleInputOperator<?, O, ?>) dataFlowOp;
// set the semantic properties
unaryOp.setSemanticProperties(udfOp.getSemanticProperties());
}
}
return dataFlowOp;
}
代码示例来源:origin: apache/flink
configuration.addAll(jobGraph.getJobConfiguration());
configuration.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "0");
configuration.addAll(this.configuration);
代码示例来源:origin: apache/flink
private <I1, I2, O> org.apache.flink.api.common.operators.Operator<O> translateTwoInputOperator(TwoInputOperator<?, ?, ?, ?> op) {
@SuppressWarnings("unchecked")
TwoInputOperator<I1, I2, O, ?> typedOp = (TwoInputOperator<I1, I2, O, ?>) op;
@SuppressWarnings("unchecked")
DataSet<I1> typedInput1 = (DataSet<I1>) op.getInput1();
@SuppressWarnings("unchecked")
DataSet<I2> typedInput2 = (DataSet<I2>) op.getInput2();
Operator<I1> input1 = translate(typedInput1);
Operator<I2> input2 = translate(typedInput2);
org.apache.flink.api.common.operators.Operator<O> dataFlowOp = typedOp.translateToDataFlow(input1, input2);
if (op instanceof UdfOperator<?>) {
@SuppressWarnings("unchecked")
TwoInputUdfOperator<I1, I2, O, ?> udfOp = (TwoInputUdfOperator<I1, I2, O, ?>) op;
// set configuration parameters
Configuration opParams = udfOp.getParameters();
if (opParams != null) {
dataFlowOp.getParameters().addAll(opParams);
}
if (dataFlowOp instanceof org.apache.flink.api.common.operators.DualInputOperator) {
org.apache.flink.api.common.operators.DualInputOperator<?, ?, O, ?> binaryOp =
(org.apache.flink.api.common.operators.DualInputOperator<?, ?, O, ?>) dataFlowOp;
// set the semantic properties
binaryOp.setSemanticProperties(udfOp.getSemanticProperties());
}
}
return dataFlowOp;
}
代码示例来源:origin: apache/flink
dynamicProperties.addAll(containerSpec.getDynamicConfiguration());
代码示例来源:origin: apache/flink
configuration.addAll(clientConfiguration);
代码示例来源:origin: apache/flink
protected GenericDataSinkBase<T> translateToDataFlow(Operator<T> input) {
// select the name (or create a default one)
String name = this.name != null ? this.name : this.format.toString();
GenericDataSinkBase<T> sink = new GenericDataSinkBase<>(this.format, new UnaryOperatorInformation<>(this.type, new NothingTypeInfo()), name);
// set input
sink.setInput(input);
// set parameters
if (this.parameters != null) {
sink.getParameters().addAll(this.parameters);
}
// set parallelism
if (this.parallelism > 0) {
// use specified parallelism
sink.setParallelism(this.parallelism);
} else {
// if no parallelism has been specified, use parallelism of input operator to enable chaining
sink.setParallelism(input.getParallelism());
}
if (this.sortKeyPositions != null) {
// configure output sorting
Ordering ordering = new Ordering();
for (int i = 0; i < this.sortKeyPositions.length; i++) {
ordering.appendOrdering(this.sortKeyPositions[i], null, this.sortOrders[i]);
}
sink.setLocalOrder(ordering);
}
return sink;
}
代码示例来源:origin: apache/flink
@Before
public void setUp() throws Exception {
final Configuration clientConfig = new Configuration();
clientConfig.setInteger(RestOptions.RETRY_MAX_ATTEMPTS, 0);
clientConfig.setLong(RestOptions.RETRY_DELAY, 0);
clientConfig.addAll(CLUSTER.getClientConfiguration());
client = new RestClusterClient<>(
clientConfig,
StandaloneClusterId.getInstance()
);
}
代码示例来源:origin: apache/flink
config.addAll(jobGraph.getJobConfiguration());
config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "0");
代码示例来源:origin: uber/AthenaX
static LocalFlinkMiniCluster execute(LocalStreamEnvironment env,
Configuration conf, String jobName) throws Exception {
StreamGraph streamGraph = env.getStreamGraph();
streamGraph.setJobName(jobName);
JobGraph jobGraph = streamGraph.getJobGraph();
Configuration configuration = new Configuration(conf);
configuration.addAll(jobGraph.getJobConfiguration());
configuration.setLong("taskmanager.memory.size", -1L);
configuration.setInteger("taskmanager.numberOfTaskSlots", jobGraph.getMaximumParallelism());
LocalFlinkMiniCluster cluster = new LocalFlinkMiniCluster(configuration, true);
cluster.start();
cluster.submitJobDetached(jobGraph);
return cluster;
}
}
内容来源于网络,如有侵权,请联系作者删除!