本文整理了Java中com.alibaba.datax.common.util.Configuration.set()
方法的一些代码示例,展示了Configuration.set()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Configuration.set()
方法的具体详情如下:
包路径:com.alibaba.datax.common.util.Configuration
类名称:Configuration
方法名:set
[英]根据用户提供的json path,插入指定对象,并返回之前存在的对象(如果存在)
目前仅支持.以及数组下标寻址, 例如:
config.set("a.b.c[3]", object);
对于插入对象,Configuration不做任何限制,但是请务必保证该对象是简单对象(包括Map、List),不要使用自定义对象,否则后续对于JSON序列化等情况会出现未定义行为。
[中]根据用户提供的json路径插入指定对象,并返回之前存在的对象(如果存在)
目前仅支持.以及数组下标寻址, 例如:
配置。集合(“a.b.c[3]”,对象);
对于插入对象,配置不做任何限制,但是请务必保证该对象是简单对象(包括地图、列表)不要使用自定义对象,否则后续对于JSON序列化等情况会出现未定义行为。
代码示例来源:origin: ECNU-1X/DataX-Masking
private static void adjustChannelNumPerTaskGroup(List<Configuration> taskGroupConfig, int channelNumber) {
int taskGroupNumber = taskGroupConfig.size();
int avgChannelsPerTaskGroup = channelNumber / taskGroupNumber;
int remainderChannelCount = channelNumber % taskGroupNumber;
// 表示有 remainderChannelCount 个 taskGroup,其对应 Channel 个数应该为:avgChannelsPerTaskGroup + 1;
// (taskGroupNumber - remainderChannelCount)个 taskGroup,其对应 Channel 个数应该为:avgChannelsPerTaskGroup
int i = 0;
for (; i < remainderChannelCount; i++) {
taskGroupConfig.get(i).set(CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_CHANNEL, avgChannelsPerTaskGroup + 1);
}
for (int j = 0; j < taskGroupNumber - remainderChannelCount; j++) {
taskGroupConfig.get(i + j).set(CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_CHANNEL, avgChannelsPerTaskGroup);
}
}
代码示例来源:origin: ECNU-1X/DataX-Masking
public static void dealWhere(Configuration originalConfig) {
String where = originalConfig.getString(Key.WHERE, null);
if(StringUtils.isNotBlank(where)) {
String whereImprove = where.trim();
if(whereImprove.endsWith(";") || whereImprove.endsWith(";")) {
whereImprove = whereImprove.substring(0,whereImprove.length()-1);
}
originalConfig.set(Key.WHERE, whereImprove);
}
}
代码示例来源:origin: ECNU-1X/DataX-Masking
public List<Configuration> split(int mandatoryNumber){
LOG.info("Begin split and MandatoryNumber : {}", mandatoryNumber);
List<Configuration> configurations = new ArrayList<Configuration>();
for (int i = 0; i < mandatoryNumber; i++) {
Configuration configuration = Configuration.newDefault();
configuration.set(OTSConst.OTS_CONF, GsonParser.confToJson(this.conf));
configurations.add(configuration);
}
LOG.info("End split.");
assert(mandatoryNumber == configurations.size());
return configurations;
}
代码示例来源:origin: ECNU-1X/DataX-Masking
private void dealSplitMode(Configuration originalConfig) {
String splitMode = originalConfig.getString(Key.SPLIT_MODE, Constant.DEFAULT_SPLIT_MODE).trim();
if (splitMode.equalsIgnoreCase(Constant.DEFAULT_SPLIT_MODE) ||
splitMode.equalsIgnoreCase(Constant.PARTITION_SPLIT_MODE)) {
originalConfig.set(Key.SPLIT_MODE, splitMode);
} else {
throw DataXException.asDataXException(OdpsReaderErrorCode.SPLIT_MODE_ERROR,
String.format("您所配置的 splitMode:%s 不正确. splitMode 仅允许配置为 record 或者 partition.", splitMode));
}
}
代码示例来源:origin: ECNU-1X/DataX-Masking
public static void doCheckBatchSize(Configuration originalConfig) {
// 检查batchSize 配置(选填,如果未填写,则设置为默认值)
int batchSize = originalConfig.getInt(Key.BATCH_SIZE, Constant.DEFAULT_BATCH_SIZE);
if (batchSize < 1) {
throw DataXException.asDataXException(DBUtilErrorCode.ILLEGAL_VALUE, String.format(
"您的batchSize配置有误. 您所配置的写入数据库表的 batchSize:%s 不能小于1. 推荐配置范围为:[100-1000], 该值越大, 内存溢出可能性越大. 请检查您的配置并作出修改.",
batchSize));
}
originalConfig.set(Key.BATCH_SIZE, batchSize);
}
代码示例来源:origin: ECNU-1X/DataX-Masking
/**
* 删除path对应的值,如果path不存在,将抛出异常。
*/
public Object remove(final String path) {
final Object result = this.get(path);
if (null == result) {
throw DataXException.asDataXException(
CommonErrorCode.RUNTIME_ERROR,
String.format("配置文件对应Key[%s]并不存在,该情况是代码编程错误. 请联系DataX团队的同学.", path));
}
this.set(path, null);
return result;
}
代码示例来源:origin: ECNU-1X/DataX-Masking
private void dealFetchSize(Configuration originalConfig) {
int fetchSize = originalConfig.getInt(
com.alibaba.datax.plugin.rdbms.reader.Constant.FETCH_SIZE,
Constant.DEFAULT_FETCH_SIZE);
if (fetchSize < 1) {
throw DataXException
.asDataXException(DBUtilErrorCode.REQUIRED_VALUE,
String.format("您配置的 fetchSize 有误,fetchSize:[%d] 值不能小于 1.",
fetchSize));
}
originalConfig.set(
com.alibaba.datax.plugin.rdbms.reader.Constant.FETCH_SIZE,
fetchSize);
}
代码示例来源:origin: ECNU-1X/DataX-Masking
private void dealColumn(Configuration originalConfig, List<String> allColumns) {
//之前已经检查了userConfiguredColumns 一定不为空
List<String> userConfiguredColumns = originalConfig.getList(Key.COLUMN, String.class);
if (1 == userConfiguredColumns.size() && "*".equals(userConfiguredColumns.get(0))) {
userConfiguredColumns = allColumns;
originalConfig.set(Key.COLUMN, allColumns);
} else {
//检查列是否重复,大小写不敏感(所有写入,都是不允许写入段的列重复的)
ListUtil.makeSureNoValueDuplicate(userConfiguredColumns, false);
//检查列是否存在,大小写不敏感
ListUtil.makeSureBInA(allColumns, userConfiguredColumns, false);
}
List<Integer> columnPositions = OdpsUtil.parsePosition(allColumns, userConfiguredColumns);
originalConfig.set(Constant.COLUMN_POSITION, columnPositions);
}
代码示例来源:origin: ECNU-1X/DataX-Masking
public static void validateParameter(com.alibaba.datax.common.util.Configuration originalConfig) {
originalConfig.getNecessaryValue(Key.HBASE_CONFIG, Hbase11xWriterErrorCode.REQUIRED_VALUE);
originalConfig.getNecessaryValue(Key.TABLE, Hbase11xWriterErrorCode.REQUIRED_VALUE);
Hbase11xHelper.validateMode(originalConfig);
String encoding = originalConfig.getString(Key.ENCODING, Constant.DEFAULT_ENCODING);
if (!Charset.isSupported(encoding)) {
throw DataXException.asDataXException(Hbase11xWriterErrorCode.ILLEGAL_VALUE, String.format("Hbasewriter 不支持您所配置的编码:[%s]", encoding));
}
originalConfig.set(Key.ENCODING, encoding);
Boolean walFlag = originalConfig.getBool(Key.WAL_FLAG, false);
originalConfig.set(Key.WAL_FLAG, walFlag);
long writeBufferSize = originalConfig.getLong(Key.WRITE_BUFFER_SIZE,Constant.DEFAULT_WRITE_BUFFER_SIZE);
originalConfig.set(Key.WRITE_BUFFER_SIZE, writeBufferSize);
}
代码示例来源:origin: ECNU-1X/DataX-Masking
public static void validateParameter(com.alibaba.datax.common.util.Configuration originalConfig) {
originalConfig.getNecessaryValue(Key.HBASE_CONFIG, Hbase094xWriterErrorCode.REQUIRED_VALUE);
originalConfig.getNecessaryValue(Key.TABLE, Hbase094xWriterErrorCode.REQUIRED_VALUE);
Hbase094xHelper.validateMode(originalConfig);
String encoding = originalConfig.getString(Key.ENCODING, Constant.DEFAULT_ENCODING);
if (!Charset.isSupported(encoding)) {
throw DataXException.asDataXException(Hbase094xWriterErrorCode.ILLEGAL_VALUE, String.format("Hbasewriter 不支持您所配置的编码:[%s]", encoding));
}
originalConfig.set(Key.ENCODING, encoding);
Boolean autoFlush = originalConfig.getBool(Key.AUTO_FLUSH, false);
//本期设置autoflush 一定为flase,通过hbase writeBufferSize来控制每次flush大小
originalConfig.set(Key.AUTO_FLUSH,false);
Boolean walFlag = originalConfig.getBool(Key.WAL_FLAG, false);
originalConfig.set(Key.WAL_FLAG, walFlag);
long writeBufferSize = originalConfig.getLong(Key.WRITE_BUFFER_SIZE,Constant.DEFAULT_WRITE_BUFFER_SIZE);
originalConfig.set(Key.WRITE_BUFFER_SIZE, writeBufferSize);
}
代码示例来源:origin: ECNU-1X/DataX-Masking
public static List<Configuration> doSplit(
Configuration originalSliceConfig, int adviceNumber, MongoClient mongoClient) {
List<Configuration> confList = new ArrayList<Configuration>();
String dbName = originalSliceConfig.getString(KeyConstant.MONGO_DB_NAME, originalSliceConfig.getString(KeyConstant.MONGO_DATABASE));
String collName = originalSliceConfig.getString(KeyConstant.MONGO_COLLECTION_NAME);
if(Strings.isNullOrEmpty(dbName) || Strings.isNullOrEmpty(collName) || mongoClient == null) {
throw DataXException.asDataXException(MongoDBReaderErrorCode.ILLEGAL_VALUE,
MongoDBReaderErrorCode.ILLEGAL_VALUE.getDescription());
}
boolean isObjectId = isPrimaryIdObjectId(mongoClient, dbName, collName);
List<Range> rangeList = doSplitCollection(adviceNumber, mongoClient, dbName, collName, isObjectId);
for(Range range : rangeList) {
Configuration conf = originalSliceConfig.clone();
conf.set(KeyConstant.LOWER_BOUND, range.lowerBound);
conf.set(KeyConstant.UPPER_BOUND, range.upperBound);
conf.set(KeyConstant.IS_OBJECTID, isObjectId);
confList.add(conf);
}
return confList;
}
代码示例来源:origin: ECNU-1X/DataX-Masking
public static List<Configuration> doSplit(Configuration originalSliceConfig,
int adviceNumber) {
boolean isTableMode = originalSliceConfig.getBool(Constant.IS_TABLE_MODE).booleanValue();
int tableNumber = originalSliceConfig.getInt(Constant.TABLE_NUMBER_MARK);
if (isTableMode && tableNumber == 1) {
//需要先把内层的 table,connection 先放到外层
String table = originalSliceConfig.getString(String.format("%s[0].%s[0]", Constant.CONN_MARK, Key.TABLE)).trim();
originalSliceConfig.set(Key.TABLE, table);
//注意:这里的 jdbcUrl 不是从数组中获取的,因为之前的 master init 方法已经进行过预处理
String jdbcUrl = originalSliceConfig.getString(String.format("%s[0].%s", Constant.CONN_MARK, Key.JDBC_URL)).trim();
originalSliceConfig.set(Key.JDBC_URL, DataBaseType.DRDS.appendJDBCSuffixForReader(jdbcUrl));
originalSliceConfig.remove(Constant.CONN_MARK);
return doDrdsReaderSplit(originalSliceConfig);
} else {
throw DataXException.asDataXException(DBUtilErrorCode.CONF_ERROR, "您的配置信息中的表(table)的配置有误. 因为Drdsreader 只需要读取一张逻辑表,后台会通过DRDS Proxy自动获取实际对应物理表的数据. 请检查您的配置并作出修改.");
}
}
代码示例来源:origin: ECNU-1X/DataX-Masking
@Override
public void init() {
this.originalConfig = super.getPluginJobConf();
String writeMode = this.originalConfig.getString(Key.WRITE_MODE, DEFAULT_WRITEMODE);
if (!DEFAULT_WRITEMODE.equalsIgnoreCase(writeMode) &&
!INSERT_IGNORE_WRITEMODE.equalsIgnoreCase(writeMode)) {
throw DataXException.asDataXException(DBUtilErrorCode.CONF_ERROR,
String.format("写入模式(writeMode)配置错误. DRDSWriter只支持两种写入模式为:[%s, %s], 但是您配置的写入模式为:%s. 请检查您的配置并作出修改.",
DEFAULT_WRITEMODE, INSERT_IGNORE_WRITEMODE, writeMode));
}
this.originalConfig.set(Key.WRITE_MODE, writeMode);
this.commonRdbmsWriterJob = new CommonRdbmsWriter.Job(DATABASE_TYPE);
this.commonRdbmsWriterJob.init(this.originalConfig);
}
代码示例来源:origin: ECNU-1X/DataX-Masking
@Override
public void init() {
this.originalConfig = super.getPluginJobConf();
Integer userConfigedFetchSize = this.originalConfig.getInt(Constant.FETCH_SIZE);
if (userConfigedFetchSize != null) {
LOG.warn("对 mysqlreader 不需要配置 fetchSize, mysqlreader 将会忽略这项配置. 如果您不想再看到此警告,请去除fetchSize 配置.");
}
this.originalConfig.set(Constant.FETCH_SIZE, Integer.MIN_VALUE);
this.commonRdbmsReaderJob = new CommonRdbmsReader.Job(DATABASE_TYPE);
this.commonRdbmsReaderJob.init(this.originalConfig);
}
代码示例来源:origin: ECNU-1X/DataX-Masking
private void preCheckInit() {
this.jobId = this.configuration.getLong(
CoreConstant.DATAX_CORE_CONTAINER_JOB_ID, -1);
if (this.jobId < 0) {
LOG.info("Set jobId = 0");
this.jobId = 0;
this.configuration.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_ID,
this.jobId);
}
Thread.currentThread().setName("job-" + this.jobId);
JobPluginCollector jobPluginCollector = new DefaultJobPluginCollector(
this.getContainerCommunicator());
this.jobReader = this.preCheckReaderInit(jobPluginCollector);
this.jobWriter = this.preCheckWriterInit(jobPluginCollector);
}
代码示例来源:origin: ECNU-1X/DataX-Masking
/**
* 对配置进行初步处理:
* <ol>
* <li>处理同一个数据库配置了多个jdbcUrl的情况</li>
* <li>识别并标记是采用querySql 模式还是 table 模式</li>
* <li>对 table 模式,确定分表个数,并处理 column 转 *事项</li>
* </ol>
*/
private static void simplifyConf(Configuration originalConfig) {
boolean isTableMode = recognizeTableOrQuerySqlMode(originalConfig);
originalConfig.set(Constant.IS_TABLE_MODE, isTableMode);
dealJdbcAndTable(originalConfig);
dealColumnConf(originalConfig);
}
代码示例来源:origin: ECNU-1X/DataX-Masking
public static Configuration filterSensitiveConfiguration(Configuration configuration){
Set<String> keys = configuration.getKeys();
for (final String key : keys) {
boolean isSensitive = StringUtils.endsWithIgnoreCase(key, "password")
|| StringUtils.endsWithIgnoreCase(key, "accessKey");
if (isSensitive && configuration.get(key) instanceof String) {
configuration.set(key, configuration.getString(key).replaceAll(".", "*"));
}
}
return configuration;
}
代码示例来源:origin: ECNU-1X/DataX-Masking
@Override
public void init() {
this.originalConfig = super.getPluginJobConf();
int fetchSize = this.originalConfig.getInt(com.alibaba.datax.plugin.rdbms.reader.Constant.FETCH_SIZE,
Constant.DEFAULT_FETCH_SIZE);
if (fetchSize < 1) {
throw DataXException.asDataXException(DBUtilErrorCode.REQUIRED_VALUE,
String.format("您配置的fetchSize有误,根据DataX的设计,fetchSize : [%d] 设置值不能小于 1.", fetchSize));
}
this.originalConfig.set(com.alibaba.datax.plugin.rdbms.reader.Constant.FETCH_SIZE, fetchSize);
this.commonRdbmsReaderMaster = new CommonRdbmsReader.Job(DATABASE_TYPE);
this.commonRdbmsReaderMaster.init(this.originalConfig);
}
代码示例来源:origin: ECNU-1X/DataX-Masking
public static String filterJobConfiguration(final Configuration configuration) {
Configuration jobConfWithSetting = configuration.getConfiguration("job").clone();
Configuration jobContent = jobConfWithSetting.getConfiguration("content");
filterSensitiveConfiguration(jobContent);
jobConfWithSetting.set("content",jobContent);
return jobConfWithSetting.beautify();
}
代码示例来源:origin: ECNU-1X/DataX-Masking
@Override
public void init() {
this.originalConfig = super.getPluginJobConf();
int fetchSize = this.originalConfig.getInt(Constant.FETCH_SIZE,
Integer.MIN_VALUE);
this.originalConfig.set(Constant.FETCH_SIZE, fetchSize);
this.validateConfiguration();
this.commonRdbmsReaderJob = new CommonRdbmsReader.Job(
DATABASE_TYPE);
this.commonRdbmsReaderJob.init(this.originalConfig);
}
内容来源于网络,如有侵权,请联系作者删除!