Question
这个方法在Util.sqlRS中查询子表数据,但是当子表数据完全被删除时,由于查不到关联数据,导致更新主表的对应子属性失败
private void subTableSimpleFieldOperation(ESSyncConfig config, Dml dml, Map<String, Object> data,
Map<String, Object> old, TableItem tableItem) {
ESMapping mapping = config.getEsMapping();
MySqlSelectQueryBlock queryBlock = SqlParser.parseSQLSelectQueryBlock(tableItem.getSubQuerySql());
StringBuilder sql = new StringBuilder();
sql.append("SELECT ")
.append(SqlParser.parse4SQLSelectItem(queryBlock))
.append(" FROM ")
.append(SqlParser.parse4FromTableSource(queryBlock));
String whereSql = SqlParser.parse4WhereItem(queryBlock);
if (whereSql != null) {
sql.append(" WHERE ").append(whereSql);
} else {
sql.append(" WHERE 1=1 ");
}
List<Object> values = new ArrayList<>();
for (FieldItem fkFieldItem : tableItem.getRelationTableFields().keySet()) {
String columnName = fkFieldItem.getColumn().getColumnName();
Object value = esTemplate.getValFromData(mapping, data, fkFieldItem.getFieldName(), columnName);
sql.append(" AND ").append(columnName).append("=? ");
values.add(value);
}
String groupSql = SqlParser.parse4GroupBy(queryBlock);
if (groupSql != null) {
sql.append(groupSql);
}
DataSource ds = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());
if (logger.isTraceEnabled()) {
logger.trace("Join table update es index by query sql, destination:{}, table: {}, index: {}, sql: {}",
config.getDestination(),
dml.getTable(),
mapping.get_index(),
sql.toString().replace("\n", " "));
}
Util.sqlRS(ds, sql.toString(), values, rs -> {
try {
while (rs.next()) {
Map<String, Object> esFieldData = new LinkedHashMap<>();
for (FieldItem fieldItem : tableItem.getRelationSelectFieldItems()) {
if (old != null) {
out: for (FieldItem fieldItem1 : tableItem.getSubQueryFields()) {
for (ColumnItem columnItem0 : fieldItem.getColumnItems()) {
if (fieldItem1.getFieldName().equals(columnItem0.getColumnName()))
for (ColumnItem columnItem : fieldItem1.getColumnItems()) {
if (old.containsKey(Util.cleanColumn(columnItem.getColumnName()))) {
Object val = esTemplate.getValFromRS(mapping,
rs,
fieldItem.getFieldName(),
fieldItem.getColumn().getColumnName());
esFieldData.put(Util.cleanColumn(fieldItem.getFieldName()), val);
break out;
}
}
}
}
} else {
Object val = esTemplate.getValFromRS(mapping,
rs,
fieldItem.getFieldName(),
fieldItem.getColumn().getColumnName());
esFieldData.put(Util.cleanColumn(fieldItem.getFieldName()), val);
}
}
Map<String, Object> paramsTmp = new LinkedHashMap<>();
for (Map.Entry<FieldItem, List<FieldItem>> entry : tableItem.getRelationTableFields().entrySet()) {
for (FieldItem fieldItem : entry.getValue()) {
if (fieldItem.getColumnItems().size() == 1) {
Object value = esTemplate.getValFromRS(mapping,
rs,
fieldItem.getFieldName(),
entry.getKey().getColumn().getColumnName());
String fieldName = fieldItem.getFieldName();
// 判断是否是主键
if (fieldName.equals(mapping.get_id())) {
fieldName = "_id";
}
paramsTmp.put(fieldName, value);
}
}
}
if (logger.isDebugEnabled()) {
logger.trace("Join table update es index by query sql, destination:{}, table: {}, index: {}",
config.getDestination(),
dml.getTable(),
mapping.get_index());
}
esTemplate.updateByQuery(config, paramsTmp, esFieldData);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
return 0;
});
}
9条答案
按热度按时间qltillow1#
假如:
A表数据:
id name
1 grag
B表数据:
id range p_id
1 fei 1
2 jiang 1
当同步sql为:select a.*,b.range from A left join B on A.id = B.p_id
在es中,B的range数据作为A索引的属性
当删除B中最后一条与a关联的数据,不会同步更新a中的range属性值
ma8fv8wu2#
我canal1.1.6也是有这种问题,找到解决方法没呢?
ht4b089n3#
我canal1.1.6也是有这种问题,找到解决方法没呢?
我理解这是个bug,着急解决的话,可以修改源码,源码里判断一下,如果查不到子表数据,就将主表属性设置为null,
@1512033796
zbsbpyhn4#
我canal1.1.6也是有这种问题,找到解决方法没呢?
我理解这是个bug,着急解决的话,可以修改源码,源码里判断一下,如果查不到子表数据,就将主表属性设置为null, @1512033796
大佬知道改那个地方嘛?
irlmq6kh5#
}
@1512033796 在代码 while (rs.next()) 这个地方判断
whhtz7ly6#
但是不优雅,不知道作者有没有好的建议
byqmnocz7#
但是不优雅,不知道作者有没有好的建议
大佬直接pr啊
euoag5mw8#
我canal1.1.5也是有这种问题,找到解决方法没呢?
nwsw7zdq9#
我canal1.1.5也是有这种问题,找到解决方法没呢?
解决方法就是尽量在数据库中弄一个单表数据同步到es,因为你用子表写法,不但会出现上面的问题,同步的效率还很慢。我主表子表都是80万数据,修改子表数据时,单表同步可以实现到1秒内,子表同步至少需要10秒,还是没有算消息拥堵的情况。