本文整理了Java中com.alibaba.datax.common.util.Configuration.newDefault()
方法的一些代码示例,展示了Configuration.newDefault()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Configuration.newDefault()
方法的具体详情如下:
包路径:com.alibaba.datax.common.util.Configuration
类名称:Configuration
方法名:newDefault
[英]初始化空白的Configuration
[中]初始化空白的配置
代码示例来源:origin: ECNU-1X/DataX-Masking
public List<Configuration> split(int mandatoryNumber){
LOG.info("Begin split and MandatoryNumber : {}", mandatoryNumber);
List<Configuration> configurations = new ArrayList<Configuration>();
for (int i = 0; i < mandatoryNumber; i++) {
Configuration configuration = Configuration.newDefault();
configuration.set(OTSConst.OTS_CONF, GsonParser.confToJson(this.conf));
configurations.add(configuration);
}
LOG.info("End split.");
assert(mandatoryNumber == configurations.size());
return configurations;
}
代码示例来源:origin: ECNU-1X/DataX-Masking
private List<Configuration> mergeReaderAndWriterTaskConfigs(
List<Configuration> readerTasksConfigs,
List<Configuration> writerTasksConfigs,
List<Configuration> transformerConfigs) {
if (readerTasksConfigs.size() != writerTasksConfigs.size()) {
throw DataXException.asDataXException(
FrameworkErrorCode.PLUGIN_SPLIT_ERROR,
String.format("reader切分的task数目[%d]不等于writer切分的task数目[%d].",
readerTasksConfigs.size(), writerTasksConfigs.size())
);
}
List<Configuration> contentConfigs = new ArrayList<Configuration>();
for (int i = 0; i < readerTasksConfigs.size(); i++) {
Configuration taskConfig = Configuration.newDefault();
taskConfig.set(CoreConstant.JOB_READER_NAME,
this.readerPluginName);
taskConfig.set(CoreConstant.JOB_READER_PARAMETER,
readerTasksConfigs.get(i));
taskConfig.set(CoreConstant.JOB_WRITER_NAME,
this.writerPluginName);
taskConfig.set(CoreConstant.JOB_WRITER_PARAMETER,
writerTasksConfigs.get(i));
if(transformerConfigs!=null && transformerConfigs.size()>0){
taskConfig.set(CoreConstant.JOB_TRANSFORMER, transformerConfigs);
}
taskConfig.set(CoreConstant.TASK_ID, i);
contentConfigs.add(taskConfig);
}
return contentConfigs;
}
代码示例来源:origin: ECNU-1X/DataX-Masking
public static Configuration parseOnePluginConfig(final String path,
final String type,
Set<String> pluginSet, List<String> wantPluginNames) {
String filePath = path + File.separator + "plugin.json";
Configuration configuration = Configuration.from(new File(filePath));
String pluginPath = configuration.getString("path");
String pluginName = configuration.getString("name");
if(!pluginSet.contains(pluginName)) {
pluginSet.add(pluginName);
} else {
throw DataXException.asDataXException(FrameworkErrorCode.PLUGIN_INIT_ERROR, "插件加载失败,存在重复插件:" + filePath);
}
//不是想要的插件,返回null
if (wantPluginNames != null && wantPluginNames.size() > 0 && !wantPluginNames.contains(pluginName)) {
return null;
}
boolean isDefaultPath = StringUtils.isBlank(pluginPath);
if (isDefaultPath) {
configuration.set("path", path);
}
Configuration result = Configuration.newDefault();
result.set(
String.format("plugin.%s.%s", type, pluginName),
configuration.getInternal());
return result;
}
代码示例来源:origin: ECNU-1X/DataX-Masking
public List<Configuration> split(int adviceNumber) {
int shardCount = streamJob.getShardIds().size();
int splitNumber = Math.min(adviceNumber, shardCount);
int splitSize = shardCount / splitNumber;
List<Configuration> configurations = new ArrayList<Configuration>();
List<String> shardIds = new ArrayList<String>(streamJob.getShardIds());
Collections.shuffle(shardIds);
int start = 0;
int end = 0;
int remain = shardCount % splitNumber;
for (int i = 0; i < splitNumber; i++) {
start = end;
end = start + splitSize;
if (remain > 0) {
end += 1;
remain -= 1;
}
Configuration configuration = Configuration.newDefault();
configuration.set(OTSStreamReaderConstants.CONF, GsonParser.configToJson(conf));
configuration.set(OTSStreamReaderConstants.STREAM_JOB, streamJob.toJson());
configuration.set(OTSStreamReaderConstants.ALL_SHARDS, GsonParser.toJson(allShards));
configuration.set(OTSStreamReaderConstants.OWNED_SHARDS, GsonParser.listToJson(shardIds.subList(start, end)));
configurations.add(configuration);
}
LOG.info("Master split to {} slave, with advice number {}.", configurations.size(), adviceNumber);
return configurations;
}
代码示例来源:origin: ECNU-1X/DataX-Masking
public List<Configuration> split(int num) throws Exception {
LOG.info("Expect split num : " + num);
List<Configuration> configurations = new ArrayList<Configuration>();
List<OTSRange> ranges = null;
if (this.conf.getRangeSplit() != null) { // 用户显示指定了拆分范围
LOG.info("Begin userDefinedRangeSplit");
ranges = userDefinedRangeSplit(meta, range, this.conf.getRangeSplit());
LOG.info("End userDefinedRangeSplit");
} else { // 采用默认的切分算法
LOG.info("Begin defaultRangeSplit");
ranges = defaultRangeSplit(ots, meta, range, num);
LOG.info("End defaultRangeSplit");
}
// 解决大量的Split Point序列化消耗内存的问题
// 因为slave中不会使用这个配置,所以置为空
this.conf.setRangeSplit(null);
for (OTSRange item : ranges) {
Configuration configuration = Configuration.newDefault();
configuration.set(OTSConst.OTS_CONF, GsonParser.confToJson(this.conf));
configuration.set(OTSConst.OTS_RANGE, GsonParser.rangeToJson(item));
configuration.set(OTSConst.OTS_DIRECTION, GsonParser.directionToJson(direction));
configurations.add(configuration);
}
LOG.info("Configuration list count : " + configurations.size());
return configurations;
}
代码示例来源:origin: ECNU-1X/DataX-Masking
public static Configuration parsePluginConfig(List<String> wantPluginNames) {
Configuration configuration = Configuration.newDefault();
Set<String> replicaCheckPluginSet = new HashSet<String>();
int complete = 0;
for (final String each : ConfigParser
.getDirAsList(CoreConstant.DATAX_PLUGIN_READER_HOME)) {
Configuration eachReaderConfig = ConfigParser.parseOnePluginConfig(each, "reader", replicaCheckPluginSet, wantPluginNames);
if(eachReaderConfig!=null) {
configuration.merge(eachReaderConfig, true);
complete += 1;
}
}
for (final String each : ConfigParser
.getDirAsList(CoreConstant.DATAX_PLUGIN_WRITER_HOME)) {
Configuration eachWriterConfig = ConfigParser.parseOnePluginConfig(each, "writer", replicaCheckPluginSet, wantPluginNames);
if(eachWriterConfig!=null) {
configuration.merge(eachWriterConfig, true);
complete += 1;
}
}
if (wantPluginNames != null && wantPluginNames.size() > 0 && wantPluginNames.size() != complete) {
throw DataXException.asDataXException(FrameworkErrorCode.PLUGIN_INIT_ERROR, "插件加载失败,未完成指定插件加载:" + wantPluginNames);
}
return configuration;
}
内容来源于网络,如有侵权,请联系作者删除!