canal在mysql同步es中,当A表left join B on A.id=B.p_id 场景下,删除子表B的最后一条数据,不会同步更新A里面的数据

x33g5p2x  于 4个月前  发布在  Mysql
关注(0)|答案(9)|浏览(63)

Question

这个方法在Util.sqlRS中查询子表数据,但是当子表数据完全被删除时,由于查不到关联数据,导致更新主表的对应子属性失败

private void subTableSimpleFieldOperation(ESSyncConfig config, Dml dml, Map<String, Object> data,
Map<String, Object> old, TableItem tableItem) {
ESMapping mapping = config.getEsMapping();

MySqlSelectQueryBlock queryBlock = SqlParser.parseSQLSelectQueryBlock(tableItem.getSubQuerySql());
    StringBuilder sql = new StringBuilder();
    sql.append("SELECT ")
        .append(SqlParser.parse4SQLSelectItem(queryBlock))
        .append(" FROM ")
        .append(SqlParser.parse4FromTableSource(queryBlock));

    String whereSql = SqlParser.parse4WhereItem(queryBlock);
    if (whereSql != null) {
        sql.append(" WHERE ").append(whereSql);
    } else {
        sql.append(" WHERE 1=1 ");
    }

    List<Object> values = new ArrayList<>();

    for (FieldItem fkFieldItem : tableItem.getRelationTableFields().keySet()) {
        String columnName = fkFieldItem.getColumn().getColumnName();
        Object value = esTemplate.getValFromData(mapping, data, fkFieldItem.getFieldName(), columnName);
        sql.append(" AND ").append(columnName).append("=? ");
        values.add(value);
    }

    String groupSql = SqlParser.parse4GroupBy(queryBlock);
    if (groupSql != null) {
        sql.append(groupSql);
    }

    DataSource ds = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());
    if (logger.isTraceEnabled()) {
        logger.trace("Join table update es index by query sql, destination:{}, table: {}, index: {}, sql: {}",
            config.getDestination(),
            dml.getTable(),
            mapping.get_index(),
            sql.toString().replace("\n", " "));
    }
    Util.sqlRS(ds, sql.toString(), values, rs -> {
        try {
            while (rs.next()) {
                Map<String, Object> esFieldData = new LinkedHashMap<>();

                for (FieldItem fieldItem : tableItem.getRelationSelectFieldItems()) {
                    if (old != null) {
                        out: for (FieldItem fieldItem1 : tableItem.getSubQueryFields()) {
                            for (ColumnItem columnItem0 : fieldItem.getColumnItems()) {
                                if (fieldItem1.getFieldName().equals(columnItem0.getColumnName()))
                                    for (ColumnItem columnItem : fieldItem1.getColumnItems()) {
                                        if (old.containsKey(Util.cleanColumn(columnItem.getColumnName()))) {
                                            Object val = esTemplate.getValFromRS(mapping,
                                                rs,
                                                fieldItem.getFieldName(),
                                                fieldItem.getColumn().getColumnName());
                                            esFieldData.put(Util.cleanColumn(fieldItem.getFieldName()), val);
                                            break out;
                                        }
                                    }
                            }
                        }
                    } else {
                        Object val = esTemplate.getValFromRS(mapping,
                            rs,
                            fieldItem.getFieldName(),
                            fieldItem.getColumn().getColumnName());
                        esFieldData.put(Util.cleanColumn(fieldItem.getFieldName()), val);
                    }
                }

                Map<String, Object> paramsTmp = new LinkedHashMap<>();
                for (Map.Entry<FieldItem, List<FieldItem>> entry : tableItem.getRelationTableFields().entrySet()) {
                    for (FieldItem fieldItem : entry.getValue()) {
                        if (fieldItem.getColumnItems().size() == 1) {
                            Object value = esTemplate.getValFromRS(mapping,
                                rs,
                                fieldItem.getFieldName(),
                                entry.getKey().getColumn().getColumnName());
                            String fieldName = fieldItem.getFieldName();
                            // 判断是否是主键
                            if (fieldName.equals(mapping.get_id())) {
                                fieldName = "_id";
                            }
                            paramsTmp.put(fieldName, value);
                        }
                    }
                }

                if (logger.isDebugEnabled()) {
                    logger.trace("Join table update es index by query sql, destination:{}, table: {}, index: {}",
                        config.getDestination(),
                        dml.getTable(),
                        mapping.get_index());
                }
                esTemplate.updateByQuery(config, paramsTmp, esFieldData);
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
        return 0;
    });
}
qltillow

qltillow1#

假如:
A表数据:
id name
1 grag
B表数据:
id range p_id
1 fei 1
2 jiang 1

当同步sql为:select a.*,b.range from A left join B on A.id = B.p_id
在es中,B的range数据作为A索引的属性
当删除B中最后一条与a关联的数据,不会同步更新a中的range属性值

ma8fv8wu

ma8fv8wu2#

我canal1.1.6也是有这种问题,找到解决方法没呢?

ht4b089n

ht4b089n3#

我canal1.1.6也是有这种问题,找到解决方法没呢?

我理解这是个bug,着急解决的话,可以修改源码,源码里判断一下,如果查不到子表数据,就将主表属性设置为null,
@1512033796

zbsbpyhn

zbsbpyhn4#

我canal1.1.6也是有这种问题,找到解决方法没呢?

我理解这是个bug,着急解决的话,可以修改源码,源码里判断一下,如果查不到子表数据,就将主表属性设置为null, @1512033796

大佬知道改那个地方嘛?

irlmq6kh

irlmq6kh5#

MySqlSelectQueryBlock queryBlock = SqlParser.parseSQLSelectQueryBlock(tableItem.getSubQuerySql());
StringBuilder sql = new StringBuilder();
sql.append("SELECT ")
    .append(SqlParser.parse4SQLSelectItem(queryBlock))
    .append(" FROM ")
    .append(SqlParser.parse4FromTableSource(queryBlock));

String whereSql = SqlParser.parse4WhereItem(queryBlock);
if (whereSql != null) {
    sql.append(" WHERE ").append(whereSql);
} else {
    sql.append(" WHERE 1=1 ");
}

List<Object> values = new ArrayList<>();

for (FieldItem fkFieldItem : tableItem.getRelationTableFields().keySet()) {
    String columnName = fkFieldItem.getColumn().getColumnName();
    Object value = esTemplate.getValFromData(mapping, data, fkFieldItem.getFieldName(), columnName);
    sql.append(" AND ").append(columnName).append("=? ");
    values.add(value);
}

String groupSql = SqlParser.parse4GroupBy(queryBlock);
if (groupSql != null) {
    sql.append(groupSql);
}

DataSource ds = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());
if (logger.isTraceEnabled()) {
    logger.trace("Join table update es index by query sql, destination:{}, table: {}, index: {}, sql: {}",
        config.getDestination(),
        dml.getTable(),
        mapping.get_index(),
        sql.toString().replace("\n", " "));
}
Util.sqlRS(ds, sql.toString(), values, rs -> {
    try {
        ### **while (rs.next()) {** // 在这里判断一下rs.hasNext() 如果为false,说明没有子表数据了,则将主表对应属性清空
            Map<String, Object> esFieldData = new LinkedHashMap<>();

            for (FieldItem fieldItem : tableItem.getRelationSelectFieldItems()) {
                if (old != null) {
                    out: for (FieldItem fieldItem1 : tableItem.getSubQueryFields()) {
                        for (ColumnItem columnItem0 : fieldItem.getColumnItems()) {
                            if (fieldItem1.getFieldName().equals(columnItem0.getColumnName()))
                                for (ColumnItem columnItem : fieldItem1.getColumnItems()) {
                                    if (old.containsKey(Util.cleanColumn(columnItem.getColumnName()))) {
                                        Object val = esTemplate.getValFromRS(mapping,
                                            rs,
                                            fieldItem.getFieldName(),
                                            fieldItem.getColumn().getColumnName());
                                        esFieldData.put(Util.cleanColumn(fieldItem.getFieldName()), val);
                                        break out;
                                    }
                                }
                        }
                    }
                } else {
                    Object val = esTemplate.getValFromRS(mapping,
                        rs,
                        fieldItem.getFieldName(),
                        fieldItem.getColumn().getColumnName());
                    esFieldData.put(Util.cleanColumn(fieldItem.getFieldName()), val);
                }
            }

            Map<String, Object> paramsTmp = new LinkedHashMap<>();
            for (Map.Entry<FieldItem, List<FieldItem>> entry : tableItem.getRelationTableFields().entrySet()) {
                for (FieldItem fieldItem : entry.getValue()) {
                    if (fieldItem.getColumnItems().size() == 1) {
                        Object value = esTemplate.getValFromRS(mapping,
                            rs,
                            fieldItem.getFieldName(),
                            entry.getKey().getColumn().getColumnName());
                        String fieldName = fieldItem.getFieldName();
                        // 判断是否是主键
                        if (fieldName.equals(mapping.get_id())) {
                            fieldName = "_id";
                        }
                        paramsTmp.put(fieldName, value);
                    }
                }
            }

            if (logger.isDebugEnabled()) {
                logger.trace("Join table update es index by query sql, destination:{}, table: {}, index: {}",
                    config.getDestination(),
                    dml.getTable(),
                    mapping.get_index());
            }
            esTemplate.updateByQuery(config, paramsTmp, esFieldData);
        }
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
    return 0;
});

}
@1512033796 在代码 while (rs.next()) 这个地方判断

whhtz7ly

whhtz7ly6#

但是不优雅,不知道作者有没有好的建议

byqmnocz

byqmnocz7#

但是不优雅,不知道作者有没有好的建议

大佬直接pr啊

euoag5mw

euoag5mw8#

我canal1.1.5也是有这种问题,找到解决方法没呢?

nwsw7zdq

nwsw7zdq9#

我canal1.1.5也是有这种问题,找到解决方法没呢?

解决方法就是尽量在数据库中弄一个单表数据同步到es,因为你用子表写法,不但会出现上面的问题,同步的效率还很慢。我主表子表都是80万数据,修改子表数据时,单表同步可以实现到1秒内,子表同步至少需要10秒,还是没有算消息拥堵的情况。

相关问题