com.alibaba.datax.common.util.Configuration.newDefault()方法的使用及代码示例

x33g5p2x  于2022-01-18 转载在 其他  
字(6.4k)|赞(0)|评价(0)|浏览(123)

本文整理了Java中com.alibaba.datax.common.util.Configuration.newDefault()方法的一些代码示例,展示了Configuration.newDefault()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Configuration.newDefault()方法的具体详情如下:
包路径:com.alibaba.datax.common.util.Configuration
类名称:Configuration
方法名:newDefault

Configuration.newDefault介绍

[英]初始化空白的Configuration
[中]初始化空白的配置

代码示例

代码示例来源:origin: ECNU-1X/DataX-Masking

public List<Configuration> split(int mandatoryNumber){
  LOG.info("Begin split and MandatoryNumber : {}", mandatoryNumber);
  List<Configuration> configurations = new ArrayList<Configuration>();
  for (int i = 0; i < mandatoryNumber; i++) {
    Configuration configuration = Configuration.newDefault();
    configuration.set(OTSConst.OTS_CONF, GsonParser.confToJson(this.conf));
    configurations.add(configuration);
  }
  LOG.info("End split.");
  assert(mandatoryNumber == configurations.size());
  return configurations;
}

代码示例来源:origin: ECNU-1X/DataX-Masking

private List<Configuration> mergeReaderAndWriterTaskConfigs(
    List<Configuration> readerTasksConfigs,
    List<Configuration> writerTasksConfigs,
    List<Configuration> transformerConfigs) {
  if (readerTasksConfigs.size() != writerTasksConfigs.size()) {
    throw DataXException.asDataXException(
        FrameworkErrorCode.PLUGIN_SPLIT_ERROR,
        String.format("reader切分的task数目[%d]不等于writer切分的task数目[%d].",
            readerTasksConfigs.size(), writerTasksConfigs.size())
    );
  }
  List<Configuration> contentConfigs = new ArrayList<Configuration>();
  for (int i = 0; i < readerTasksConfigs.size(); i++) {
    Configuration taskConfig = Configuration.newDefault();
    taskConfig.set(CoreConstant.JOB_READER_NAME,
        this.readerPluginName);
    taskConfig.set(CoreConstant.JOB_READER_PARAMETER,
        readerTasksConfigs.get(i));
    taskConfig.set(CoreConstant.JOB_WRITER_NAME,
        this.writerPluginName);
    taskConfig.set(CoreConstant.JOB_WRITER_PARAMETER,
        writerTasksConfigs.get(i));
    if(transformerConfigs!=null && transformerConfigs.size()>0){
      taskConfig.set(CoreConstant.JOB_TRANSFORMER, transformerConfigs);
    }
    taskConfig.set(CoreConstant.TASK_ID, i);
    contentConfigs.add(taskConfig);
  }
  return contentConfigs;
}

代码示例来源:origin: ECNU-1X/DataX-Masking

public static Configuration parseOnePluginConfig(final String path,
                         final String type,
                         Set<String> pluginSet, List<String> wantPluginNames) {
  String filePath = path + File.separator + "plugin.json";
  Configuration configuration = Configuration.from(new File(filePath));
  String pluginPath = configuration.getString("path");
  String pluginName = configuration.getString("name");
  if(!pluginSet.contains(pluginName)) {
    pluginSet.add(pluginName);
  } else {
    throw DataXException.asDataXException(FrameworkErrorCode.PLUGIN_INIT_ERROR, "插件加载失败,存在重复插件:" + filePath);
  }
  //不是想要的插件,返回null
  if (wantPluginNames != null && wantPluginNames.size() > 0 && !wantPluginNames.contains(pluginName)) {
    return null;
  }
  boolean isDefaultPath = StringUtils.isBlank(pluginPath);
  if (isDefaultPath) {
    configuration.set("path", path);
  }
  Configuration result = Configuration.newDefault();
  result.set(
      String.format("plugin.%s.%s", type, pluginName),
      configuration.getInternal());
  return result;
}

代码示例来源:origin: ECNU-1X/DataX-Masking

public List<Configuration> split(int adviceNumber) {
  int shardCount = streamJob.getShardIds().size();
  int splitNumber = Math.min(adviceNumber, shardCount);
  int splitSize = shardCount / splitNumber;
  List<Configuration> configurations = new ArrayList<Configuration>();
  List<String> shardIds = new ArrayList<String>(streamJob.getShardIds());
  Collections.shuffle(shardIds);
  int start = 0;
  int end = 0;
  int remain = shardCount % splitNumber;
  for (int i = 0; i < splitNumber; i++) {
    start = end;
    end = start + splitSize;
    if (remain > 0) {
      end += 1;
      remain -= 1;
    }
    Configuration configuration = Configuration.newDefault();
    configuration.set(OTSStreamReaderConstants.CONF, GsonParser.configToJson(conf));
    configuration.set(OTSStreamReaderConstants.STREAM_JOB, streamJob.toJson());
    configuration.set(OTSStreamReaderConstants.ALL_SHARDS, GsonParser.toJson(allShards));
    configuration.set(OTSStreamReaderConstants.OWNED_SHARDS, GsonParser.listToJson(shardIds.subList(start, end)));
    configurations.add(configuration);
  }
  LOG.info("Master split to {} slave, with advice number {}.", configurations.size(), adviceNumber);
  return configurations;
}

代码示例来源:origin: ECNU-1X/DataX-Masking

public List<Configuration> split(int num) throws Exception {
  LOG.info("Expect split num : " + num);
  
  List<Configuration> configurations = new ArrayList<Configuration>();
  List<OTSRange> ranges = null;
  if (this.conf.getRangeSplit() != null) { // 用户显示指定了拆分范围
    LOG.info("Begin userDefinedRangeSplit");
    ranges = userDefinedRangeSplit(meta, range, this.conf.getRangeSplit());
    LOG.info("End userDefinedRangeSplit");
  } else { // 采用默认的切分算法 
    LOG.info("Begin defaultRangeSplit");
    ranges = defaultRangeSplit(ots, meta, range, num);
    LOG.info("End defaultRangeSplit");
  }
  // 解决大量的Split Point序列化消耗内存的问题
  // 因为slave中不会使用这个配置,所以置为空
  this.conf.setRangeSplit(null);
  
  for (OTSRange item : ranges) {
    Configuration configuration = Configuration.newDefault();
    configuration.set(OTSConst.OTS_CONF, GsonParser.confToJson(this.conf));
    configuration.set(OTSConst.OTS_RANGE, GsonParser.rangeToJson(item));
    configuration.set(OTSConst.OTS_DIRECTION, GsonParser.directionToJson(direction));
    configurations.add(configuration);
  }
  
  LOG.info("Configuration list count : " + configurations.size());
  return configurations;
}

代码示例来源:origin: ECNU-1X/DataX-Masking

public static Configuration parsePluginConfig(List<String> wantPluginNames) {
  Configuration configuration = Configuration.newDefault();
  Set<String> replicaCheckPluginSet = new HashSet<String>();
  int complete = 0;
  for (final String each : ConfigParser
      .getDirAsList(CoreConstant.DATAX_PLUGIN_READER_HOME)) {
    Configuration eachReaderConfig = ConfigParser.parseOnePluginConfig(each, "reader", replicaCheckPluginSet, wantPluginNames);
    if(eachReaderConfig!=null) {
      configuration.merge(eachReaderConfig, true);
      complete += 1;
    }
  }
  for (final String each : ConfigParser
      .getDirAsList(CoreConstant.DATAX_PLUGIN_WRITER_HOME)) {
    Configuration eachWriterConfig = ConfigParser.parseOnePluginConfig(each, "writer", replicaCheckPluginSet, wantPluginNames);
    if(eachWriterConfig!=null) {
      configuration.merge(eachWriterConfig, true);
      complete += 1;
    }
  }
  if (wantPluginNames != null && wantPluginNames.size() > 0 && wantPluginNames.size() != complete) {
    throw DataXException.asDataXException(FrameworkErrorCode.PLUGIN_INIT_ERROR, "插件加载失败,未完成指定插件加载:" + wantPluginNames);
  }
  return configuration;
}

相关文章