mysql reader json configure 8 connections ,per connetion configure 1 table ;
channel configure to 16 , fit code condition eachTableShouldSplittedNumber = eachTableShouldSplittedNumber * 5;
when mysql table primary key use snowflake,data has billion and id has big range , the code can make huge size task, cause OOM; mysql table data size 100 million can make 1 million task
` int eachTableShouldSplittedNumber = -1;
if (isTableMode) {
// adviceNumber这里是channel数量大小, 即datax并发task数量
// eachTableShouldSplittedNumber是单表应该切分的份数, 向上取整可能和adviceNumber没有比例关系了已经
eachTableShouldSplittedNumber = calculateEachTableShouldSplittedNumber(
adviceNumber, originalSliceConfig.getInt(Constant.TABLE_NUMBER_MARK));
}
String column = originalSliceConfig.getString(Key.COLUMN);
String where = originalSliceConfig.getString(Key.WHERE, null);
List<Object> conns = originalSliceConfig.getList(Constant.CONN_MARK, Object.class);
List<Configuration> splittedConfigs = new ArrayList<Configuration>();
for (int i = 0, len = conns.size(); i < len; i++) {
Configuration sliceConfig = originalSliceConfig.clone();
Configuration connConf = Configuration.from(conns.get(i).toString());
String jdbcUrl = connConf.getString(Key.JDBC_URL);
sliceConfig.set(Key.JDBC_URL, jdbcUrl);
// 抽取 jdbcUrl 中的 ip/port 进行资源使用的打标,以提供给 core 做有意义的 shuffle 操作
sliceConfig.set(CommonConstant.LOAD_BALANCE_RESOURCE_MARK, DataBaseType.parseIpFromJdbcUrl(jdbcUrl));
sliceConfig.remove(Constant.CONN_MARK);
Configuration tempSlice;
// 说明是配置的 table 方式
if (isTableMode) {
// 已在之前进行了扩展和`处理,可以直接使用
List<String> tables = connConf.getList(Key.TABLE, String.class);
Validate.isTrue(null != tables && !tables.isEmpty(), "您读取数据库表配置错误.");
String splitPk = originalSliceConfig.getString(Key.SPLIT_PK, null);
//最终切分份数不一定等于 eachTableShouldSplittedNumber
boolean needSplitTable = eachTableShouldSplittedNumber > 1
&& StringUtils.isNotBlank(splitPk);
if (needSplitTable) {
if (tables.size() == 1) {
//原来:如果是单表的,主键切分num=num*2+1
// splitPk is null这类的情况的数据量本身就比真实数据量少很多, 和channel大小比率关系时,不建议考虑
//eachTableShouldSplittedNumber = eachTableShouldSplittedNumber * 2 + 1;// 不应该加1导致长尾
//考虑其他比率数字?(splitPk is null, 忽略此长尾)
eachTableShouldSplittedNumber = eachTableShouldSplittedNumber * 5;
}
// 尝试对每个表,切分为eachTableShouldSplittedNumber 份
for (String table : tables) {
tempSlice = sliceConfig.clone();
tempSlice.set(Key.TABLE, table);
List<Configuration> splittedSlices = SingleTableSplitUtil
.splitSingleTable(tempSlice, eachTableShouldSplittedNumber);
splittedConfigs.addAll(splittedSlices);
}
} else {
for (String table : tables) {
tempSlice = sliceConfig.clone();
tempSlice.set(Key.TABLE, table);
String queryColumn = HintUtil.buildQueryColumn(jdbcUrl, table, column);
tempSlice.set(Key.QUERY_SQL, SingleTableSplitUtil.buildQuerySql(queryColumn, table, where));
splittedConfigs.add(tempSlice);
}
}
} else {
// 说明是配置的 querySql 方式
List<String> sqls = connConf.getList(Key.QUERY_SQL, String.class);
// TODO 是否check 配置为多条语句??
for (String querySql : sqls) {
tempSlice = sliceConfig.clone();
tempSlice.set(Key.QUERY_SQL, querySql);
splittedConfigs.add(tempSlice);
}
}
}`
3条答案
按热度按时间am46iovg1#
the bug lastest code already fixed
3ks5zfa02#
lastes code also can cause such error
thigvfpy3#
怎么解决的啊请问