com.alibaba.datax.common.util.Configuration.set()方法的使用及代码示例

x33g5p2x  于2022-01-18 转载在 其他  
字(13.1k)|赞(0)|评价(0)|浏览(169)

本文整理了Java中com.alibaba.datax.common.util.Configuration.set()方法的一些代码示例,展示了Configuration.set()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Configuration.set()方法的具体详情如下:
包路径:com.alibaba.datax.common.util.Configuration
类名称:Configuration
方法名:set

Configuration.set介绍

[英]根据用户提供的json path,插入指定对象,并返回之前存在的对象(如果存在)

目前仅支持.以及数组下标寻址, 例如:

config.set("a.b.c[3]", object);

对于插入对象,Configuration不做任何限制,但是请务必保证该对象是简单对象(包括Map、List),不要使用自定义对象,否则后续对于JSON序列化等情况会出现未定义行为。
[中]根据用户提供的json路径插入指定对象,并返回之前存在的对象(如果存在)
目前仅支持.以及数组下标寻址, 例如:
配置。集合(“a.b.c[3]”,对象);
对于插入对象,配置不做任何限制,但是请务必保证该对象是简单对象(包括地图、列表)不要使用自定义对象,否则后续对于JSON序列化等情况会出现未定义行为。

代码示例

代码示例来源:origin: ECNU-1X/DataX-Masking

private static void adjustChannelNumPerTaskGroup(List<Configuration> taskGroupConfig, int channelNumber) {
  int taskGroupNumber = taskGroupConfig.size();
  int avgChannelsPerTaskGroup = channelNumber / taskGroupNumber;
  int remainderChannelCount = channelNumber % taskGroupNumber;
  // 表示有 remainderChannelCount 个 taskGroup,其对应 Channel 个数应该为:avgChannelsPerTaskGroup + 1;
  // (taskGroupNumber - remainderChannelCount)个 taskGroup,其对应 Channel 个数应该为:avgChannelsPerTaskGroup
  int i = 0;
  for (; i < remainderChannelCount; i++) {
    taskGroupConfig.get(i).set(CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_CHANNEL, avgChannelsPerTaskGroup + 1);
  }
  for (int j = 0; j < taskGroupNumber - remainderChannelCount; j++) {
    taskGroupConfig.get(i + j).set(CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_CHANNEL, avgChannelsPerTaskGroup);
  }
}

代码示例来源:origin: ECNU-1X/DataX-Masking

public static void dealWhere(Configuration originalConfig) {
  String where = originalConfig.getString(Key.WHERE, null);
  if(StringUtils.isNotBlank(where)) {
    String whereImprove = where.trim();
    if(whereImprove.endsWith(";") || whereImprove.endsWith(";")) {
      whereImprove = whereImprove.substring(0,whereImprove.length()-1);
    }
    originalConfig.set(Key.WHERE, whereImprove);
  }
}

代码示例来源:origin: ECNU-1X/DataX-Masking

public List<Configuration> split(int mandatoryNumber){
  LOG.info("Begin split and MandatoryNumber : {}", mandatoryNumber);
  List<Configuration> configurations = new ArrayList<Configuration>();
  for (int i = 0; i < mandatoryNumber; i++) {
    Configuration configuration = Configuration.newDefault();
    configuration.set(OTSConst.OTS_CONF, GsonParser.confToJson(this.conf));
    configurations.add(configuration);
  }
  LOG.info("End split.");
  assert(mandatoryNumber == configurations.size());
  return configurations;
}

代码示例来源:origin: ECNU-1X/DataX-Masking

private void dealSplitMode(Configuration originalConfig) {
  String splitMode = originalConfig.getString(Key.SPLIT_MODE, Constant.DEFAULT_SPLIT_MODE).trim();
  if (splitMode.equalsIgnoreCase(Constant.DEFAULT_SPLIT_MODE) ||
      splitMode.equalsIgnoreCase(Constant.PARTITION_SPLIT_MODE)) {
    originalConfig.set(Key.SPLIT_MODE, splitMode);
  } else {
    throw DataXException.asDataXException(OdpsReaderErrorCode.SPLIT_MODE_ERROR,
        String.format("您所配置的 splitMode:%s 不正确. splitMode 仅允许配置为 record 或者 partition.", splitMode));
  }
}

代码示例来源:origin: ECNU-1X/DataX-Masking

public static void doCheckBatchSize(Configuration originalConfig) {
  // 检查batchSize 配置(选填,如果未填写,则设置为默认值)
  int batchSize = originalConfig.getInt(Key.BATCH_SIZE, Constant.DEFAULT_BATCH_SIZE);
  if (batchSize < 1) {
    throw DataXException.asDataXException(DBUtilErrorCode.ILLEGAL_VALUE, String.format(
        "您的batchSize配置有误. 您所配置的写入数据库表的 batchSize:%s 不能小于1. 推荐配置范围为:[100-1000], 该值越大, 内存溢出可能性越大. 请检查您的配置并作出修改.",
        batchSize));
  }
  originalConfig.set(Key.BATCH_SIZE, batchSize);
}

代码示例来源:origin: ECNU-1X/DataX-Masking

/**
 * 删除path对应的值,如果path不存在,将抛出异常。
 */
public Object remove(final String path) {
  final Object result = this.get(path);
  if (null == result) {
    throw DataXException.asDataXException(
        CommonErrorCode.RUNTIME_ERROR,
        String.format("配置文件对应Key[%s]并不存在,该情况是代码编程错误. 请联系DataX团队的同学.", path));
  }
  this.set(path, null);
  return result;
}

代码示例来源:origin: ECNU-1X/DataX-Masking

private void dealFetchSize(Configuration originalConfig) {
  int fetchSize = originalConfig.getInt(
      com.alibaba.datax.plugin.rdbms.reader.Constant.FETCH_SIZE,
      Constant.DEFAULT_FETCH_SIZE);
  if (fetchSize < 1) {
    throw DataXException
        .asDataXException(DBUtilErrorCode.REQUIRED_VALUE,
            String.format("您配置的 fetchSize 有误,fetchSize:[%d] 值不能小于 1.",
                fetchSize));
  }
  originalConfig.set(
      com.alibaba.datax.plugin.rdbms.reader.Constant.FETCH_SIZE,
      fetchSize);
}

代码示例来源:origin: ECNU-1X/DataX-Masking

private void dealColumn(Configuration originalConfig, List<String> allColumns) {
  //之前已经检查了userConfiguredColumns 一定不为空
  List<String> userConfiguredColumns = originalConfig.getList(Key.COLUMN, String.class);
  if (1 == userConfiguredColumns.size() && "*".equals(userConfiguredColumns.get(0))) {
    userConfiguredColumns = allColumns;
    originalConfig.set(Key.COLUMN, allColumns);
  } else {
    //检查列是否重复,大小写不敏感(所有写入,都是不允许写入段的列重复的)
    ListUtil.makeSureNoValueDuplicate(userConfiguredColumns, false);
    //检查列是否存在,大小写不敏感
    ListUtil.makeSureBInA(allColumns, userConfiguredColumns, false);
  }
  List<Integer> columnPositions = OdpsUtil.parsePosition(allColumns, userConfiguredColumns);
  originalConfig.set(Constant.COLUMN_POSITION, columnPositions);
}

代码示例来源:origin: ECNU-1X/DataX-Masking

public static void validateParameter(com.alibaba.datax.common.util.Configuration originalConfig) {
  originalConfig.getNecessaryValue(Key.HBASE_CONFIG, Hbase11xWriterErrorCode.REQUIRED_VALUE);
  originalConfig.getNecessaryValue(Key.TABLE, Hbase11xWriterErrorCode.REQUIRED_VALUE);
  Hbase11xHelper.validateMode(originalConfig);
  String encoding = originalConfig.getString(Key.ENCODING, Constant.DEFAULT_ENCODING);
  if (!Charset.isSupported(encoding)) {
    throw DataXException.asDataXException(Hbase11xWriterErrorCode.ILLEGAL_VALUE, String.format("Hbasewriter 不支持您所配置的编码:[%s]", encoding));
  }
  originalConfig.set(Key.ENCODING, encoding);
  Boolean walFlag = originalConfig.getBool(Key.WAL_FLAG, false);
  originalConfig.set(Key.WAL_FLAG, walFlag);
  long writeBufferSize = originalConfig.getLong(Key.WRITE_BUFFER_SIZE,Constant.DEFAULT_WRITE_BUFFER_SIZE);
  originalConfig.set(Key.WRITE_BUFFER_SIZE, writeBufferSize);
}

代码示例来源:origin: ECNU-1X/DataX-Masking

public static void validateParameter(com.alibaba.datax.common.util.Configuration originalConfig) {
  originalConfig.getNecessaryValue(Key.HBASE_CONFIG, Hbase094xWriterErrorCode.REQUIRED_VALUE);
  originalConfig.getNecessaryValue(Key.TABLE, Hbase094xWriterErrorCode.REQUIRED_VALUE);
  Hbase094xHelper.validateMode(originalConfig);
  String encoding = originalConfig.getString(Key.ENCODING, Constant.DEFAULT_ENCODING);
  if (!Charset.isSupported(encoding)) {
    throw DataXException.asDataXException(Hbase094xWriterErrorCode.ILLEGAL_VALUE, String.format("Hbasewriter 不支持您所配置的编码:[%s]", encoding));
  }
  originalConfig.set(Key.ENCODING, encoding);
  Boolean autoFlush = originalConfig.getBool(Key.AUTO_FLUSH, false);
  //本期设置autoflush 一定为flase,通过hbase writeBufferSize来控制每次flush大小
  originalConfig.set(Key.AUTO_FLUSH,false);
  Boolean walFlag = originalConfig.getBool(Key.WAL_FLAG, false);
  originalConfig.set(Key.WAL_FLAG, walFlag);
  long writeBufferSize = originalConfig.getLong(Key.WRITE_BUFFER_SIZE,Constant.DEFAULT_WRITE_BUFFER_SIZE);
  originalConfig.set(Key.WRITE_BUFFER_SIZE, writeBufferSize);
}

代码示例来源:origin: ECNU-1X/DataX-Masking

public static List<Configuration> doSplit(
  Configuration originalSliceConfig, int adviceNumber, MongoClient mongoClient) {
  List<Configuration> confList = new ArrayList<Configuration>();
  String dbName = originalSliceConfig.getString(KeyConstant.MONGO_DB_NAME, originalSliceConfig.getString(KeyConstant.MONGO_DATABASE));
  String collName = originalSliceConfig.getString(KeyConstant.MONGO_COLLECTION_NAME);
  if(Strings.isNullOrEmpty(dbName) || Strings.isNullOrEmpty(collName) || mongoClient == null) {
    throw DataXException.asDataXException(MongoDBReaderErrorCode.ILLEGAL_VALUE,
      MongoDBReaderErrorCode.ILLEGAL_VALUE.getDescription());
  }
  boolean isObjectId = isPrimaryIdObjectId(mongoClient, dbName, collName);
  List<Range> rangeList = doSplitCollection(adviceNumber, mongoClient, dbName, collName, isObjectId);
  for(Range range : rangeList) {
    Configuration conf = originalSliceConfig.clone();
    conf.set(KeyConstant.LOWER_BOUND, range.lowerBound);
    conf.set(KeyConstant.UPPER_BOUND, range.upperBound);
    conf.set(KeyConstant.IS_OBJECTID, isObjectId);
    confList.add(conf);
  }
  return confList;
}

代码示例来源:origin: ECNU-1X/DataX-Masking

public static List<Configuration> doSplit(Configuration originalSliceConfig,
                     int adviceNumber) {
  boolean isTableMode = originalSliceConfig.getBool(Constant.IS_TABLE_MODE).booleanValue();
  int tableNumber = originalSliceConfig.getInt(Constant.TABLE_NUMBER_MARK);
  if (isTableMode && tableNumber == 1) {
    //需要先把内层的 table,connection 先放到外层
    String table = originalSliceConfig.getString(String.format("%s[0].%s[0]", Constant.CONN_MARK, Key.TABLE)).trim();
    originalSliceConfig.set(Key.TABLE, table);
    //注意:这里的 jdbcUrl 不是从数组中获取的,因为之前的  master init 方法已经进行过预处理
    String jdbcUrl = originalSliceConfig.getString(String.format("%s[0].%s", Constant.CONN_MARK, Key.JDBC_URL)).trim();
    originalSliceConfig.set(Key.JDBC_URL, DataBaseType.DRDS.appendJDBCSuffixForReader(jdbcUrl));
    originalSliceConfig.remove(Constant.CONN_MARK);
    return doDrdsReaderSplit(originalSliceConfig);
  } else {
    throw DataXException.asDataXException(DBUtilErrorCode.CONF_ERROR, "您的配置信息中的表(table)的配置有误. 因为Drdsreader 只需要读取一张逻辑表,后台会通过DRDS Proxy自动获取实际对应物理表的数据. 请检查您的配置并作出修改.");
  }
}

代码示例来源:origin: ECNU-1X/DataX-Masking

@Override
public void init() {
  this.originalConfig = super.getPluginJobConf();
  String writeMode = this.originalConfig.getString(Key.WRITE_MODE, DEFAULT_WRITEMODE);
  if (!DEFAULT_WRITEMODE.equalsIgnoreCase(writeMode) &&
      !INSERT_IGNORE_WRITEMODE.equalsIgnoreCase(writeMode)) {
    throw DataXException.asDataXException(DBUtilErrorCode.CONF_ERROR,
        String.format("写入模式(writeMode)配置错误. DRDSWriter只支持两种写入模式为:[%s, %s], 但是您配置的写入模式为:%s. 请检查您的配置并作出修改.",
            DEFAULT_WRITEMODE, INSERT_IGNORE_WRITEMODE, writeMode));
  }
  this.originalConfig.set(Key.WRITE_MODE, writeMode);
  this.commonRdbmsWriterJob = new CommonRdbmsWriter.Job(DATABASE_TYPE);
  this.commonRdbmsWriterJob.init(this.originalConfig);
}

代码示例来源:origin: ECNU-1X/DataX-Masking

@Override
public void init() {
  this.originalConfig = super.getPluginJobConf();
  Integer userConfigedFetchSize = this.originalConfig.getInt(Constant.FETCH_SIZE);
  if (userConfigedFetchSize != null) {
    LOG.warn("对 mysqlreader 不需要配置 fetchSize, mysqlreader 将会忽略这项配置. 如果您不想再看到此警告,请去除fetchSize 配置.");
  }
  this.originalConfig.set(Constant.FETCH_SIZE, Integer.MIN_VALUE);
  this.commonRdbmsReaderJob = new CommonRdbmsReader.Job(DATABASE_TYPE);
  this.commonRdbmsReaderJob.init(this.originalConfig);
}

代码示例来源:origin: ECNU-1X/DataX-Masking

private void preCheckInit() {
  this.jobId = this.configuration.getLong(
      CoreConstant.DATAX_CORE_CONTAINER_JOB_ID, -1);
  if (this.jobId < 0) {
    LOG.info("Set jobId = 0");
    this.jobId = 0;
    this.configuration.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_ID,
        this.jobId);
  }
  Thread.currentThread().setName("job-" + this.jobId);
  JobPluginCollector jobPluginCollector = new DefaultJobPluginCollector(
      this.getContainerCommunicator());
  this.jobReader = this.preCheckReaderInit(jobPluginCollector);
  this.jobWriter = this.preCheckWriterInit(jobPluginCollector);
}

代码示例来源:origin: ECNU-1X/DataX-Masking

/**
 * 对配置进行初步处理:
 * <ol>
 * <li>处理同一个数据库配置了多个jdbcUrl的情况</li>
 * <li>识别并标记是采用querySql 模式还是 table 模式</li>
 * <li>对 table 模式,确定分表个数,并处理 column 转 *事项</li>
 * </ol>
 */
private static void simplifyConf(Configuration originalConfig) {
  boolean isTableMode = recognizeTableOrQuerySqlMode(originalConfig);
  originalConfig.set(Constant.IS_TABLE_MODE, isTableMode);
  dealJdbcAndTable(originalConfig);
  dealColumnConf(originalConfig);
}

代码示例来源:origin: ECNU-1X/DataX-Masking

public static Configuration filterSensitiveConfiguration(Configuration configuration){
  Set<String> keys = configuration.getKeys();
  for (final String key : keys) {
    boolean isSensitive = StringUtils.endsWithIgnoreCase(key, "password")
        || StringUtils.endsWithIgnoreCase(key, "accessKey");
    if (isSensitive && configuration.get(key) instanceof String) {
      configuration.set(key, configuration.getString(key).replaceAll(".", "*"));
    }
  }
  return configuration;
}

代码示例来源:origin: ECNU-1X/DataX-Masking

@Override
public void init() {
  this.originalConfig = super.getPluginJobConf();
  int fetchSize = this.originalConfig.getInt(com.alibaba.datax.plugin.rdbms.reader.Constant.FETCH_SIZE,
      Constant.DEFAULT_FETCH_SIZE);
  if (fetchSize < 1) {
    throw DataXException.asDataXException(DBUtilErrorCode.REQUIRED_VALUE,
      String.format("您配置的fetchSize有误,根据DataX的设计,fetchSize : [%d] 设置值不能小于 1.", fetchSize));
  }
  this.originalConfig.set(com.alibaba.datax.plugin.rdbms.reader.Constant.FETCH_SIZE, fetchSize);
  this.commonRdbmsReaderMaster = new CommonRdbmsReader.Job(DATABASE_TYPE);
  this.commonRdbmsReaderMaster.init(this.originalConfig);
}

代码示例来源:origin: ECNU-1X/DataX-Masking

public static String filterJobConfiguration(final Configuration configuration) {
  Configuration jobConfWithSetting = configuration.getConfiguration("job").clone();
  Configuration jobContent = jobConfWithSetting.getConfiguration("content");
  filterSensitiveConfiguration(jobContent);
  jobConfWithSetting.set("content",jobContent);
  return jobConfWithSetting.beautify();
}

代码示例来源:origin: ECNU-1X/DataX-Masking

@Override
public void init() {
  this.originalConfig = super.getPluginJobConf();
  int fetchSize = this.originalConfig.getInt(Constant.FETCH_SIZE,
      Integer.MIN_VALUE);
  this.originalConfig.set(Constant.FETCH_SIZE, fetchSize);
  this.validateConfiguration();
  this.commonRdbmsReaderJob = new CommonRdbmsReader.Job(
      DATABASE_TYPE);
  this.commonRdbmsReaderJob.init(this.originalConfig);
}

相关文章