本文整理了Java中com.alibaba.datax.common.util.Configuration
类的一些代码示例,展示了Configuration
类的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Configuration
类的具体详情如下:
包路径:com.alibaba.datax.common.util.Configuration
类名称:Configuration
[英]Configuration 提供多级JSON配置信息无损存储
实例代码:
获取job的配置信息
Configuration configuration = Configuration.from(new File("Config.json"));
String jobContainerClass = configuration.getString("core.container.job.class");
设置多级List
configuration.set("job.reader.parameter.jdbcUrl", Arrays.asList(new String[] {"jdbc", "jdbc"}));
合并Configuration:
configuration.merge(another);
Configuration 存在两种较好地实现方式
第一种是将JSON配置信息中所有的Key全部打平,用a.b.c的级联方式作为Map的Key,内部使用一个Map保存信息
第二种是将JSON的对象直接使用结构化树形结构保存
目前使用的第二种实现方式,使用第一种的问题在于:
代码示例来源:origin: ECNU-1X/DataX-Masking
public static void validateParameter(com.alibaba.datax.common.util.Configuration originalConfig) {
originalConfig.getNecessaryValue(Key.HBASE_CONFIG, Hbase11xWriterErrorCode.REQUIRED_VALUE);
originalConfig.getNecessaryValue(Key.TABLE, Hbase11xWriterErrorCode.REQUIRED_VALUE);
Hbase11xHelper.validateMode(originalConfig);
String encoding = originalConfig.getString(Key.ENCODING, Constant.DEFAULT_ENCODING);
if (!Charset.isSupported(encoding)) {
throw DataXException.asDataXException(Hbase11xWriterErrorCode.ILLEGAL_VALUE, String.format("Hbasewriter 不支持您所配置的编码:[%s]", encoding));
}
originalConfig.set(Key.ENCODING, encoding);
Boolean walFlag = originalConfig.getBool(Key.WAL_FLAG, false);
originalConfig.set(Key.WAL_FLAG, walFlag);
long writeBufferSize = originalConfig.getLong(Key.WRITE_BUFFER_SIZE,Constant.DEFAULT_WRITE_BUFFER_SIZE);
originalConfig.set(Key.WRITE_BUFFER_SIZE, writeBufferSize);
}
代码示例来源:origin: ECNU-1X/DataX-Masking
@Override
public void init() {
this.taskConfig = super.getPluginJobConf();
this.sourceFiles = this.taskConfig.getList(Constant.SOURCE_FILES, String.class);
this.specifiedFileType = this.taskConfig.getNecessaryValue(Key.FILETYPE, HdfsReaderErrorCode.REQUIRED_VALUE);
this.encoding = this.taskConfig.getString(com.alibaba.datax.plugin.unstructuredstorage.reader.Key.ENCODING, "UTF-8");
this.dfsUtil = new DFSUtil(this.taskConfig);
this.bufferSize = this.taskConfig.getInt(com.alibaba.datax.plugin.unstructuredstorage.reader.Key.BUFFER_SIZE,
com.alibaba.datax.plugin.unstructuredstorage.reader.Constant.DEFAULT_BUFFER_SIZE);
}
代码示例来源:origin: ECNU-1X/DataX-Masking
public static String filterJobConfiguration(final Configuration configuration) {
Configuration jobConfWithSetting = configuration.getConfiguration("job").clone();
Configuration jobContent = jobConfWithSetting.getConfiguration("content");
filterSensitiveConfiguration(jobContent);
jobConfWithSetting.set("content",jobContent);
return jobConfWithSetting.beautify();
}
代码示例来源:origin: ECNU-1X/DataX-Masking
public ErrorRecordChecker(Configuration configuration) {
this(configuration.getLong(CoreConstant.DATAX_JOB_SETTING_ERRORLIMIT_RECORD),
configuration.getDouble(CoreConstant.DATAX_JOB_SETTING_ERRORLIMIT_PERCENT));
}
代码示例来源:origin: ECNU-1X/DataX-Masking
public static AdsHelper createAdsHelper(Configuration originalConfig){
//Get adsUrl,userName,password,schema等参数,创建AdsHelp实例
String adsUrl = originalConfig.getString(Key.ADS_URL);
String userName = originalConfig.getString(Key.USERNAME);
String password = originalConfig.getString(Key.PASSWORD);
String schema = originalConfig.getString(Key.SCHEMA);
Long socketTimeout = originalConfig.getLong(Key.SOCKET_TIMEOUT, Constant.DEFAULT_SOCKET_TIMEOUT);
String suffix = originalConfig.getString(Key.JDBC_URL_SUFFIX, "");
return new AdsHelper(adsUrl,userName,password,schema,socketTimeout,suffix);
}
代码示例来源:origin: ECNU-1X/DataX-Masking
public static List<Configuration> doSplit(
Configuration originalSliceConfig, int adviceNumber, MongoClient mongoClient) {
List<Configuration> confList = new ArrayList<Configuration>();
String dbName = originalSliceConfig.getString(KeyConstant.MONGO_DB_NAME, originalSliceConfig.getString(KeyConstant.MONGO_DATABASE));
String collName = originalSliceConfig.getString(KeyConstant.MONGO_COLLECTION_NAME);
if(Strings.isNullOrEmpty(dbName) || Strings.isNullOrEmpty(collName) || mongoClient == null) {
throw DataXException.asDataXException(MongoDBReaderErrorCode.ILLEGAL_VALUE,
MongoDBReaderErrorCode.ILLEGAL_VALUE.getDescription());
}
boolean isObjectId = isPrimaryIdObjectId(mongoClient, dbName, collName);
List<Range> rangeList = doSplitCollection(adviceNumber, mongoClient, dbName, collName, isObjectId);
for(Range range : rangeList) {
Configuration conf = originalSliceConfig.clone();
conf.set(KeyConstant.LOWER_BOUND, range.lowerBound);
conf.set(KeyConstant.UPPER_BOUND, range.upperBound);
conf.set(KeyConstant.IS_OBJECTID, isObjectId);
confList.add(conf);
}
return confList;
}
代码示例来源:origin: ECNU-1X/DataX-Masking
public static byte[] convertUserEndRowkey(com.alibaba.datax.common.util.Configuration configuration) {
String endRowkey = configuration.getString(Key.END_ROWKEY);
if (StringUtils.isBlank(endRowkey)) {
return HConstants.EMPTY_BYTE_ARRAY;
} else {
boolean isBinaryRowkey = configuration.getBool(Key.IS_BINARY_ROWKEY);
return Hbase11xHelper.stringToBytes(endRowkey, isBinaryRowkey);
}
}
代码示例来源:origin: ECNU-1X/DataX-Masking
public static void doCheckBatchSize(Configuration originalConfig) {
// 检查batchSize 配置(选填,如果未填写,则设置为默认值)
int batchSize = originalConfig.getInt(Key.BATCH_SIZE, Constant.DEFAULT_BATCH_SIZE);
if (batchSize < 1) {
throw DataXException.asDataXException(DBUtilErrorCode.ILLEGAL_VALUE, String.format(
"您的batchSize配置有误. 您所配置的写入数据库表的 batchSize:%s 不能小于1. 推荐配置范围为:[100-1000], 该值越大, 内存溢出可能性越大. 请检查您的配置并作出修改.",
batchSize));
}
originalConfig.set(Key.BATCH_SIZE, batchSize);
}
代码示例来源:origin: ECNU-1X/DataX-Masking
public static void dealWhere(Configuration originalConfig) {
String where = originalConfig.getString(Key.WHERE, null);
if(StringUtils.isNotBlank(where)) {
String whereImprove = where.trim();
if(whereImprove.endsWith(";") || whereImprove.endsWith(";")) {
whereImprove = whereImprove.substring(0,whereImprove.length()-1);
}
originalConfig.set(Key.WHERE, whereImprove);
}
}
代码示例来源:origin: ECNU-1X/DataX-Masking
public HbaseAbstractTask(com.alibaba.datax.common.util.Configuration configuration) {
this.htable = Hbase094xHelper.getTable(configuration);
this.columns = configuration.getListConfiguration(Key.COLUMN);
this.rowkeyColumn = configuration.getListConfiguration(Key.ROWKEY_COLUMN);
this.versionColumn = configuration.getConfiguration(Key.VERSION_COLUMN);
this.encoding = configuration.getString(Key.ENCODING,Constant.DEFAULT_ENCODING);
this.nullMode = NullModeType.getByTypeName(configuration.getString(Key.NULL_MODE,Constant.DEFAULT_NULL_MODE));
this.walFlag = configuration.getBool(Key.WAL_FLAG, false);
}
代码示例来源:origin: ECNU-1X/DataX-Masking
public TaskGroupContainer(Configuration configuration) {
super(configuration);
initCommunicator(configuration);
this.jobId = this.configuration.getLong(
CoreConstant.DATAX_CORE_CONTAINER_JOB_ID);
this.taskGroupId = this.configuration.getInt(
CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_ID);
this.channelClazz = this.configuration.getString(
CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_CLASS);
this.taskCollectorClass = this.configuration.getString(
CoreConstant.DATAX_CORE_STATISTICS_COLLECTOR_PLUGIN_TASKCLASS);
}
代码示例来源:origin: ECNU-1X/DataX-Masking
public static Configuration filterSensitiveConfiguration(Configuration configuration){
Set<String> keys = configuration.getKeys();
for (final String key : keys) {
boolean isSensitive = StringUtils.endsWithIgnoreCase(key, "password")
|| StringUtils.endsWithIgnoreCase(key, "accessKey");
if (isSensitive && configuration.get(key) instanceof String) {
configuration.set(key, configuration.getString(key).replaceAll(".", "*"));
}
}
return configuration;
}
代码示例来源:origin: ECNU-1X/DataX-Masking
@Override
public String getPluginName() {
assert null != this.pluginConf;
return this.pluginConf.getString("name");
}
代码示例来源:origin: ECNU-1X/DataX-Masking
public HbaseAbstractTask(com.alibaba.datax.common.util.Configuration configuration) {
this.htable = Hbase11xHelper.getTable(configuration);
this.encoding = configuration.getString(Key.ENCODING,Constant.DEFAULT_ENCODING);
this.startKey = Hbase11xHelper.convertInnerStartRowkey(configuration);
this.endKey = Hbase11xHelper.convertInnerEndRowkey(configuration);
this.scanCacheSize = configuration.getInt(Key.SCAN_CACHE_SIZE,Constant.DEFAULT_SCAN_CACHE_SIZE);
this.scanBatchSize = configuration.getInt(Key.SCAN_BATCH_SIZE,Constant.DEFAULT_SCAN_BATCH_SIZE);
}
代码示例来源:origin: ECNU-1X/DataX-Masking
@Override
public void init() {
this.writerSliceConfig = getPluginJobConf();
this.fieldDelimiter = this.writerSliceConfig.getString(
Key.FIELD_DELIMITER, "\t");
this.print = this.writerSliceConfig.getBool(Key.PRINT, true);
this.path = this.writerSliceConfig.getString(Key.PATH, null);
this.fileName = this.writerSliceConfig.getString(Key.FILE_NAME, null);
this.recordNumBeforSleep = this.writerSliceConfig.getLong(Key.RECORD_NUM_BEFORE_SLEEP, 0);
this.sleepTime = this.writerSliceConfig.getLong(Key.SLEEP_TIME, 0);
if(recordNumBeforSleep < 0) {
throw DataXException.asDataXException(StreamWriterErrorCode.CONFIG_INVALID_EXCEPTION, "recordNumber 不能为负值");
}
if(sleepTime <0) {
throw DataXException.asDataXException(StreamWriterErrorCode.CONFIG_INVALID_EXCEPTION, "sleep 不能为负值");
}
}
代码示例来源:origin: ECNU-1X/DataX-Masking
public static Configuration parseAccessIdAndKey(Configuration originalConfig) {
String accessId = originalConfig.getString(Key.ACCESS_ID);
String accessKey = originalConfig.getString(Key.ACCESS_KEY);
// 只要 accessId,accessKey 二者配置了一个,就理解为是用户本意是要直接手动配置其 accessid/accessKey
if (StringUtils.isNotBlank(accessId) || StringUtils.isNotBlank(accessKey)) {
LOG.info("Try to get accessId/accessKey from your config.");
//通过如下语句,进行检查是否确实配置了
accessId = originalConfig.getNecessaryValue(Key.ACCESS_ID, OdpsReaderErrorCode.REQUIRED_VALUE);
accessKey = originalConfig.getNecessaryValue(Key.ACCESS_KEY, OdpsReaderErrorCode.REQUIRED_VALUE);
//检查完毕,返回即可
return originalConfig;
} else {
Map<String, String> envProp = System.getenv();
return getAccessIdAndKeyFromEnv(originalConfig, envProp);
}
}
代码示例来源:origin: ECNU-1X/DataX-Masking
public static void validateColumn(com.alibaba.datax.common.util.Configuration originalConfig){
List<Configuration> columns = originalConfig.getListConfiguration(Key.COLUMN);
if (columns == null || columns.isEmpty()) {
throw DataXException.asDataXException(Hbase094xWriterErrorCode.REQUIRED_VALUE, "column为必填项,其形式为:column:[{\"index\": 0,\"name\": \"cf0:column0\",\"type\": \"string\"},{\"index\": 1,\"name\": \"cf1:column1\",\"type\": \"long\"}]");
}
for (Configuration aColumn : columns) {
Integer index = aColumn.getInt(Key.INDEX);
String type = aColumn.getNecessaryValue(Key.TYPE, Hbase094xWriterErrorCode.REQUIRED_VALUE);
String name = aColumn.getNecessaryValue(Key.NAME, Hbase094xWriterErrorCode.REQUIRED_VALUE);
ColumnType.getByTypeName(type);
if(name.split(":").length != 2){
throw DataXException.asDataXException(Hbase094xWriterErrorCode.ILLEGAL_VALUE, String.format("您column配置项中name配置的列格式[%s]不正确,name应该配置为 列族:列名 的形式, 如 {\"index\": 1,\"name\": \"cf1:q1\",\"type\": \"long\"}", name));
}
if(index == null || index < 0){
throw DataXException.asDataXException(Hbase094xWriterErrorCode.ILLEGAL_VALUE, "您的column配置项不正确,配置项中中index为必填项,且为非负数,请检查并修改.");
}
}
}
代码示例来源:origin: ECNU-1X/DataX-Masking
/**
* 根据用户提供的json path,寻址Integer对象,如果对象不存在,返回默认Integer对象
*
* @return Integer对象,如果path不存在或者Integer不存在,返回默认Integer对象
*/
public Integer getInt(final String path, int defaultValue) {
Integer object = this.getInt(path);
if (null == object) {
return defaultValue;
}
return object;
}
代码示例来源:origin: ECNU-1X/DataX-Masking
@Override
public void init() {
this.readerSliceConfig = super.getPluginJobConf();
this.columns = this.readerSliceConfig.getList(Key.COLUMN,
String.class);
this.sliceRecordCount = this.readerSliceConfig
.getLong(Key.SLICE_RECORD_COUNT);
this.haveMixupFunction = this.readerSliceConfig.getBool(
Constant.HAVE_MIXUP_FUNCTION, false);
}
代码示例来源:origin: ECNU-1X/DataX-Masking
public MultiVersionTask(Configuration configuration) {
super(configuration);
this.maxVersion = configuration.getInt(Key.MAX_VERSION);
this.column = configuration.getList(Key.COLUMN, Map.class);
this.familyQualifierMap = Hbase11xHelper.parseColumnOfMultiversionMode(this.column);
try {
MultiVersionTask.COLON_BYTE = ":".getBytes("utf8");
} catch (UnsupportedEncodingException e) {
throw DataXException.asDataXException(Hbase11xReaderErrorCode.PREPAR_READ_ERROR, "系统内部获取 列族与列名冒号分隔符的二进制时失败.", e);
}
}
内容来源于网络,如有侵权,请联系作者删除!