com.alibaba.datax.common.util.Configuration类的使用及代码示例

x33g5p2x  于2022-01-18 转载在 其他  
字(13.1k)|赞(0)|评价(0)|浏览(394)

本文整理了Java中com.alibaba.datax.common.util.Configuration类的一些代码示例,展示了Configuration类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Configuration类的具体详情如下:
包路径:com.alibaba.datax.common.util.Configuration
类名称:Configuration

Configuration介绍

[英]Configuration 提供多级JSON配置信息无损存储

实例代码:

获取job的配置信息
Configuration configuration = Configuration.from(new File("Config.json"));
String jobContainerClass = configuration.getString("core.container.job.class");

设置多级List
configuration.set("job.reader.parameter.jdbcUrl", Arrays.asList(new String[] {"jdbc", "jdbc"}));

合并Configuration:
configuration.merge(another);

Configuration 存在两种较好地实现方式
第一种是将JSON配置信息中所有的Key全部打平,用a.b.c的级联方式作为Map的Key,内部使用一个Map保存信息
第二种是将JSON的对象直接使用结构化树形结构保存

目前使用的第二种实现方式,使用第一种的问题在于:

  1. 插入新对象,比较难处理,例如a.b.c="bazhen",此时如果需要插入a="bazhen",也即是根目录下第一层所有类型全部要废弃 ,使用"bazhen"作为value,第一种方式使用字符串表示key,难以处理这类问题。
  2. 返回树形结构,例如 a.b.c.d = "bazhen",如果返回"a"下的所有元素,实际上是一个Map,需要合并处理
  3. 输出JSON,将上述对象转为JSON,要把上述Map的多级key转为树形结构,并输出为JSON
    [中]配置提供多级JSON配置信息无损存储
    实例代码:
    获取工作的配置信息
    配置=配置。来自(新文件(“Config.json”);
    字符串jobContainerClass=配置。getString(“core.container.job.class”);
    设置多级列表
    配置set(“job.reader.parameter.jdbcUrl”,Arrays.asList(新字符串[]{“jdbc”,“jdbc”}));
    合并配置:
    配置合并(另一个);
    配置存在两种较好地实现方式
    第一种是将JSON配置信息中所有的钥匙全部打平,用a、 不列颠哥伦比亚省的级联方式作为地图的钥匙内部使用一个地图保存信息
    第二种是将JSON的对象直接使用结构化树形结构保存
    目前使用的第二种实现方式,使用第一种的问题在于:
    1.插入新对象,比较难处理,例如a、 b.c=“八珍”此时如果需要插入a=“八珍”也即是根目录下第一层所有类型全部要废弃 ,使用“八珍”作为价值第一种方式使用字符串表示钥匙难以处理这类问题。
    2.返回树形结构,例如 a、 b.c.d=“八珍”如果返回“a”下的所有元素,实际上是一个地图需要合并处理
    3.输出JSON将上述对象转为JSON要把上述地图的多级钥匙转为树形结构,并输出为JSON

代码示例

代码示例来源:origin: ECNU-1X/DataX-Masking

public static void validateParameter(com.alibaba.datax.common.util.Configuration originalConfig) {
  originalConfig.getNecessaryValue(Key.HBASE_CONFIG, Hbase11xWriterErrorCode.REQUIRED_VALUE);
  originalConfig.getNecessaryValue(Key.TABLE, Hbase11xWriterErrorCode.REQUIRED_VALUE);
  Hbase11xHelper.validateMode(originalConfig);
  String encoding = originalConfig.getString(Key.ENCODING, Constant.DEFAULT_ENCODING);
  if (!Charset.isSupported(encoding)) {
    throw DataXException.asDataXException(Hbase11xWriterErrorCode.ILLEGAL_VALUE, String.format("Hbasewriter 不支持您所配置的编码:[%s]", encoding));
  }
  originalConfig.set(Key.ENCODING, encoding);
  Boolean walFlag = originalConfig.getBool(Key.WAL_FLAG, false);
  originalConfig.set(Key.WAL_FLAG, walFlag);
  long writeBufferSize = originalConfig.getLong(Key.WRITE_BUFFER_SIZE,Constant.DEFAULT_WRITE_BUFFER_SIZE);
  originalConfig.set(Key.WRITE_BUFFER_SIZE, writeBufferSize);
}

代码示例来源:origin: ECNU-1X/DataX-Masking

@Override
public void init() {
  this.taskConfig = super.getPluginJobConf();
  this.sourceFiles = this.taskConfig.getList(Constant.SOURCE_FILES, String.class);
  this.specifiedFileType = this.taskConfig.getNecessaryValue(Key.FILETYPE, HdfsReaderErrorCode.REQUIRED_VALUE);
  this.encoding = this.taskConfig.getString(com.alibaba.datax.plugin.unstructuredstorage.reader.Key.ENCODING, "UTF-8");
  this.dfsUtil = new DFSUtil(this.taskConfig);
  this.bufferSize = this.taskConfig.getInt(com.alibaba.datax.plugin.unstructuredstorage.reader.Key.BUFFER_SIZE,
      com.alibaba.datax.plugin.unstructuredstorage.reader.Constant.DEFAULT_BUFFER_SIZE);
}

代码示例来源:origin: ECNU-1X/DataX-Masking

public static String filterJobConfiguration(final Configuration configuration) {
  Configuration jobConfWithSetting = configuration.getConfiguration("job").clone();
  Configuration jobContent = jobConfWithSetting.getConfiguration("content");
  filterSensitiveConfiguration(jobContent);
  jobConfWithSetting.set("content",jobContent);
  return jobConfWithSetting.beautify();
}

代码示例来源:origin: ECNU-1X/DataX-Masking

public ErrorRecordChecker(Configuration configuration) {
  this(configuration.getLong(CoreConstant.DATAX_JOB_SETTING_ERRORLIMIT_RECORD),
      configuration.getDouble(CoreConstant.DATAX_JOB_SETTING_ERRORLIMIT_PERCENT));
}

代码示例来源:origin: ECNU-1X/DataX-Masking

public static AdsHelper createAdsHelper(Configuration originalConfig){
  //Get adsUrl,userName,password,schema等参数,创建AdsHelp实例
  String adsUrl = originalConfig.getString(Key.ADS_URL);
  String userName = originalConfig.getString(Key.USERNAME);
  String password = originalConfig.getString(Key.PASSWORD);
  String schema = originalConfig.getString(Key.SCHEMA);
  Long socketTimeout = originalConfig.getLong(Key.SOCKET_TIMEOUT, Constant.DEFAULT_SOCKET_TIMEOUT);
  String suffix = originalConfig.getString(Key.JDBC_URL_SUFFIX, "");
  return new AdsHelper(adsUrl,userName,password,schema,socketTimeout,suffix);
}

代码示例来源:origin: ECNU-1X/DataX-Masking

public static List<Configuration> doSplit(
  Configuration originalSliceConfig, int adviceNumber, MongoClient mongoClient) {
  List<Configuration> confList = new ArrayList<Configuration>();
  String dbName = originalSliceConfig.getString(KeyConstant.MONGO_DB_NAME, originalSliceConfig.getString(KeyConstant.MONGO_DATABASE));
  String collName = originalSliceConfig.getString(KeyConstant.MONGO_COLLECTION_NAME);
  if(Strings.isNullOrEmpty(dbName) || Strings.isNullOrEmpty(collName) || mongoClient == null) {
    throw DataXException.asDataXException(MongoDBReaderErrorCode.ILLEGAL_VALUE,
      MongoDBReaderErrorCode.ILLEGAL_VALUE.getDescription());
  }
  boolean isObjectId = isPrimaryIdObjectId(mongoClient, dbName, collName);
  List<Range> rangeList = doSplitCollection(adviceNumber, mongoClient, dbName, collName, isObjectId);
  for(Range range : rangeList) {
    Configuration conf = originalSliceConfig.clone();
    conf.set(KeyConstant.LOWER_BOUND, range.lowerBound);
    conf.set(KeyConstant.UPPER_BOUND, range.upperBound);
    conf.set(KeyConstant.IS_OBJECTID, isObjectId);
    confList.add(conf);
  }
  return confList;
}

代码示例来源:origin: ECNU-1X/DataX-Masking

public static byte[] convertUserEndRowkey(com.alibaba.datax.common.util.Configuration configuration) {
  String endRowkey = configuration.getString(Key.END_ROWKEY);
  if (StringUtils.isBlank(endRowkey)) {
    return HConstants.EMPTY_BYTE_ARRAY;
  } else {
    boolean isBinaryRowkey = configuration.getBool(Key.IS_BINARY_ROWKEY);
    return Hbase11xHelper.stringToBytes(endRowkey, isBinaryRowkey);
  }
}

代码示例来源:origin: ECNU-1X/DataX-Masking

public static void doCheckBatchSize(Configuration originalConfig) {
  // 检查batchSize 配置(选填,如果未填写,则设置为默认值)
  int batchSize = originalConfig.getInt(Key.BATCH_SIZE, Constant.DEFAULT_BATCH_SIZE);
  if (batchSize < 1) {
    throw DataXException.asDataXException(DBUtilErrorCode.ILLEGAL_VALUE, String.format(
        "您的batchSize配置有误. 您所配置的写入数据库表的 batchSize:%s 不能小于1. 推荐配置范围为:[100-1000], 该值越大, 内存溢出可能性越大. 请检查您的配置并作出修改.",
        batchSize));
  }
  originalConfig.set(Key.BATCH_SIZE, batchSize);
}

代码示例来源:origin: ECNU-1X/DataX-Masking

public static void dealWhere(Configuration originalConfig) {
  String where = originalConfig.getString(Key.WHERE, null);
  if(StringUtils.isNotBlank(where)) {
    String whereImprove = where.trim();
    if(whereImprove.endsWith(";") || whereImprove.endsWith(";")) {
      whereImprove = whereImprove.substring(0,whereImprove.length()-1);
    }
    originalConfig.set(Key.WHERE, whereImprove);
  }
}

代码示例来源:origin: ECNU-1X/DataX-Masking

public HbaseAbstractTask(com.alibaba.datax.common.util.Configuration configuration) {
  this.htable = Hbase094xHelper.getTable(configuration);
  this.columns = configuration.getListConfiguration(Key.COLUMN);
  this.rowkeyColumn = configuration.getListConfiguration(Key.ROWKEY_COLUMN);
  this.versionColumn = configuration.getConfiguration(Key.VERSION_COLUMN);
  this.encoding = configuration.getString(Key.ENCODING,Constant.DEFAULT_ENCODING);
  this.nullMode = NullModeType.getByTypeName(configuration.getString(Key.NULL_MODE,Constant.DEFAULT_NULL_MODE));
  this.walFlag = configuration.getBool(Key.WAL_FLAG, false);
}

代码示例来源:origin: ECNU-1X/DataX-Masking

public TaskGroupContainer(Configuration configuration) {
  super(configuration);
  initCommunicator(configuration);
  this.jobId = this.configuration.getLong(
      CoreConstant.DATAX_CORE_CONTAINER_JOB_ID);
  this.taskGroupId = this.configuration.getInt(
      CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_ID);
  this.channelClazz = this.configuration.getString(
      CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_CLASS);
  this.taskCollectorClass = this.configuration.getString(
      CoreConstant.DATAX_CORE_STATISTICS_COLLECTOR_PLUGIN_TASKCLASS);
}

代码示例来源:origin: ECNU-1X/DataX-Masking

public static Configuration filterSensitiveConfiguration(Configuration configuration){
  Set<String> keys = configuration.getKeys();
  for (final String key : keys) {
    boolean isSensitive = StringUtils.endsWithIgnoreCase(key, "password")
        || StringUtils.endsWithIgnoreCase(key, "accessKey");
    if (isSensitive && configuration.get(key) instanceof String) {
      configuration.set(key, configuration.getString(key).replaceAll(".", "*"));
    }
  }
  return configuration;
}

代码示例来源:origin: ECNU-1X/DataX-Masking

@Override
public String getPluginName() {
  assert null != this.pluginConf;
  return this.pluginConf.getString("name");
}

代码示例来源:origin: ECNU-1X/DataX-Masking

public HbaseAbstractTask(com.alibaba.datax.common.util.Configuration configuration) {
  this.htable = Hbase11xHelper.getTable(configuration);
  this.encoding = configuration.getString(Key.ENCODING,Constant.DEFAULT_ENCODING);
  this.startKey = Hbase11xHelper.convertInnerStartRowkey(configuration);
  this.endKey =  Hbase11xHelper.convertInnerEndRowkey(configuration);
  this.scanCacheSize = configuration.getInt(Key.SCAN_CACHE_SIZE,Constant.DEFAULT_SCAN_CACHE_SIZE);
  this.scanBatchSize = configuration.getInt(Key.SCAN_BATCH_SIZE,Constant.DEFAULT_SCAN_BATCH_SIZE);
}

代码示例来源:origin: ECNU-1X/DataX-Masking

@Override
public void init() {
  this.writerSliceConfig = getPluginJobConf();
  this.fieldDelimiter = this.writerSliceConfig.getString(
      Key.FIELD_DELIMITER, "\t");
  this.print = this.writerSliceConfig.getBool(Key.PRINT, true);
  this.path = this.writerSliceConfig.getString(Key.PATH, null);
  this.fileName = this.writerSliceConfig.getString(Key.FILE_NAME, null);
  this.recordNumBeforSleep = this.writerSliceConfig.getLong(Key.RECORD_NUM_BEFORE_SLEEP, 0);
  this.sleepTime = this.writerSliceConfig.getLong(Key.SLEEP_TIME, 0);
  if(recordNumBeforSleep < 0) {
    throw DataXException.asDataXException(StreamWriterErrorCode.CONFIG_INVALID_EXCEPTION, "recordNumber 不能为负值");
  }
  if(sleepTime <0) {
    throw DataXException.asDataXException(StreamWriterErrorCode.CONFIG_INVALID_EXCEPTION, "sleep 不能为负值");
  }
}

代码示例来源:origin: ECNU-1X/DataX-Masking

public static Configuration parseAccessIdAndKey(Configuration originalConfig) {
  String accessId = originalConfig.getString(Key.ACCESS_ID);
  String accessKey = originalConfig.getString(Key.ACCESS_KEY);
  // 只要 accessId,accessKey 二者配置了一个,就理解为是用户本意是要直接手动配置其 accessid/accessKey
  if (StringUtils.isNotBlank(accessId) || StringUtils.isNotBlank(accessKey)) {
    LOG.info("Try to get accessId/accessKey from your config.");
    //通过如下语句,进行检查是否确实配置了
    accessId = originalConfig.getNecessaryValue(Key.ACCESS_ID, OdpsReaderErrorCode.REQUIRED_VALUE);
    accessKey = originalConfig.getNecessaryValue(Key.ACCESS_KEY, OdpsReaderErrorCode.REQUIRED_VALUE);
    //检查完毕,返回即可
    return originalConfig;
  } else {
    Map<String, String> envProp = System.getenv();
    return getAccessIdAndKeyFromEnv(originalConfig, envProp);
  }
}

代码示例来源:origin: ECNU-1X/DataX-Masking

public static void validateColumn(com.alibaba.datax.common.util.Configuration originalConfig){
  List<Configuration> columns = originalConfig.getListConfiguration(Key.COLUMN);
  if (columns == null || columns.isEmpty()) {
    throw DataXException.asDataXException(Hbase094xWriterErrorCode.REQUIRED_VALUE, "column为必填项,其形式为:column:[{\"index\": 0,\"name\": \"cf0:column0\",\"type\": \"string\"},{\"index\": 1,\"name\": \"cf1:column1\",\"type\": \"long\"}]");
  }
  for (Configuration aColumn : columns) {
    Integer index = aColumn.getInt(Key.INDEX);
    String type = aColumn.getNecessaryValue(Key.TYPE, Hbase094xWriterErrorCode.REQUIRED_VALUE);
    String name = aColumn.getNecessaryValue(Key.NAME, Hbase094xWriterErrorCode.REQUIRED_VALUE);
    ColumnType.getByTypeName(type);
    if(name.split(":").length != 2){
      throw DataXException.asDataXException(Hbase094xWriterErrorCode.ILLEGAL_VALUE, String.format("您column配置项中name配置的列格式[%s]不正确,name应该配置为 列族:列名  的形式, 如 {\"index\": 1,\"name\": \"cf1:q1\",\"type\": \"long\"}", name));
    }
    if(index == null || index < 0){
      throw DataXException.asDataXException(Hbase094xWriterErrorCode.ILLEGAL_VALUE, "您的column配置项不正确,配置项中中index为必填项,且为非负数,请检查并修改.");
    }
  }
}

代码示例来源:origin: ECNU-1X/DataX-Masking

/**
 * 根据用户提供的json path,寻址Integer对象,如果对象不存在,返回默认Integer对象
 * 
 * @return Integer对象,如果path不存在或者Integer不存在,返回默认Integer对象
 */
public Integer getInt(final String path, int defaultValue) {
  Integer object = this.getInt(path);
  if (null == object) {
    return defaultValue;
  }
  return object;
}

代码示例来源:origin: ECNU-1X/DataX-Masking

@Override
public void init() {
  this.readerSliceConfig = super.getPluginJobConf();
  this.columns = this.readerSliceConfig.getList(Key.COLUMN,
      String.class);
  this.sliceRecordCount = this.readerSliceConfig
      .getLong(Key.SLICE_RECORD_COUNT);
  this.haveMixupFunction = this.readerSliceConfig.getBool(
      Constant.HAVE_MIXUP_FUNCTION, false);
}

代码示例来源:origin: ECNU-1X/DataX-Masking

public MultiVersionTask(Configuration configuration) {
  super(configuration);
  this.maxVersion = configuration.getInt(Key.MAX_VERSION);
  this.column = configuration.getList(Key.COLUMN, Map.class);
  this.familyQualifierMap = Hbase11xHelper.parseColumnOfMultiversionMode(this.column);
  try {
    MultiVersionTask.COLON_BYTE = ":".getBytes("utf8");
  } catch (UnsupportedEncodingException e) {
    throw DataXException.asDataXException(Hbase11xReaderErrorCode.PREPAR_READ_ERROR, "系统内部获取 列族与列名冒号分隔符的二进制时失败.", e);
  }
}

相关文章