otter load delete 失败

polhcujo  于 2022-11-02  发布在  其他
关注(0)|答案(4)|浏览(141)

并发测试情况下,先插入->后修改id->最后删除数据 偶现数据未删除成功问题,但是load delete日志是存在的,也没有报如何异常?

load insert 日志:

row_load-2019-04-24-0.log-- PairId: 44 , TableId: 13 , EventType : I , Time : 1556042711000
row_load-2019-04-24-0.log-- Consistency :  , Mode :
row_load-2019-04-24-0.log------------------
row_load-2019-04-24-0.log----Pks
row_load-2019-04-24-0.log-	EventColumn[index=0,columnType=-5,columnName=id,columnValue=390442,isNull=false,isKey=true,isUpdate=true,isReverse=false]
row_load-2019-04-24-0.log----oldPks
row_load-2019-04-24-0.log-
row_load-2019-04-24-0.log----Columns
row_load-2019-04-24-0.log:	EventColumn[index=1,columnType=-5,columnName=traceId,columnValue=1997861556042711096,isNull=false,isKey=false,isUpdate=true,isReverse=false]
row_load-2019-04-24-0.log-	EventColumn[index=2,columnType=12,columnName=status,columnValue=test,isNull=false,isKey=false,isUpdate=true,isReverse=false]
row_load-2019-04-24-0.log-	EventColumn[index=3,columnType=12,columnName=userId,columnValue=dxy_test,isNull=false,isKey=false,isUpdate=true,isReverse=false]
row_load-2019-04-24-0.log-	EventColumn[index=4,columnType=93,columnName=createTime,columnValue=2019-04-24 02:05:11,isNull=false,isKey=false,isUpdate=true,isReverse=false]
row_load-2019-04-24-0.log-	EventColumn[index=5,columnType=93,columnName=modifyTime,columnValue=2018-07-19 11:05:48,isNull=false,isKey=false,isUpdate=true,isReverse=false]
row_load-2019-04-24-0.log----Sql
row_load-2019-04-24-0.log-	insert into `otter_db_1`.`message`(`traceId` , `status` , `userId` , `createTime` , `modifyTime` , `id`) values (? , ? , ? , ? , ? , ?) on duplicate key update `traceId`=values(`traceId`) , `status`=values(`status`) , `userId`=values(`userId`) , `createTime`=values(`createTime`) , `modifyTime`=values(`modifyTime`) , `id`=values(`id`)

load delete日志:

row_load-2019-04-24-0.log------------------
row_load-2019-04-24-0.log-- PairId: 44 , TableId: 13 , EventType : D , Time : 1556042714000
row_load-2019-04-24-0.log-- Consistency :  , Mode :
row_load-2019-04-24-0.log------------------
row_load-2019-04-24-0.log----Pks
row_load-2019-04-24-0.log-	EventColumn[index=0,columnType=3,columnName=id,columnValue=390442,isNull=false,isKey=true,isUpdate=true,isReverse=false]
row_load-2019-04-24-0.log----oldPks
row_load-2019-04-24-0.log-
row_load-2019-04-24-0.log----Columns
row_load-2019-04-24-0.log:	EventColumn[index=1,columnType=-5,columnName=traceId,columnValue=1997861556042711096,isNull=false,isKey=false,isUpdate=true,isReverse=false]
row_load-2019-04-24-0.log-	EventColumn[index=2,columnType=12,columnName=status,columnValue=updateId,isNull=false,isKey=false,isUpdate=true,isReverse=false]
row_load-2019-04-24-0.log-	EventColumn[index=3,columnType=12,columnName=userId,columnValue=390442,isNull=false,isKey=false,isUpdate=true,isReverse=false]
row_load-2019-04-24-0.log-	EventColumn[index=4,columnType=93,columnName=createTime,columnValue=2019-04-24 02:05:11,isNull=false,isKey=false,isUpdate=true,isReverse=false]
row_load-2019-04-24-0.log-	EventColumn[index=5,columnType=93,columnName=modifyTime,columnValue=2019-04-24 02:05:13,isNull=false,isKey=false,isUpdate=true,isReverse=false]
row_load-2019-04-24-0.log----Sql
row_load-2019-04-24-0.log-	delete from `otter_db_1`.`message` where  `id` = ?
row_load-2019-04-24-0.log------------------

执行sql如下:

21-2019-04-24-3.log:2019-04-24 02:06:01.786 [pipelineId = 21 , pipelineName = sync , DbLoadAction] WARN  com.alibaba.otter.node.etl.load.loader.db.DbLoadAction - sql23:delete from `otter_db_1`.`message` where  `id` = ?   ---  columns:[EventColumn[index=0,columnType=3,columnName=id,columnValue=390442,isNull=false,isKey=true,isUpdate=true,isReverse=false]]
21-2019-04-24-3.log:2019-04-24 02:06:01.879 [pipelineId = 21 , pipelineName = sync , DbLoadAction] WARN  com.alibaba.otter.node.etl.load.loader.db.DbLoadAction - sql23:insert into `otter_db_1`.`message`(`traceId` , `status` , `userId` , `createTime` , `modifyTime` , `id`) values (? , ? , ? , ? , ? , ?) on duplicate key update `traceId`=values(`traceId`) , `status`=values(`status`) , `userId`=values(`userId`) , `createTime`=values(`createTime`) , `modifyTime`=values(`modifyTime`) , `id`=values(`id`)  ---  columns:[EventColumn[index=1,columnType=-5,columnName=traceId,columnValue=1997861556042711096,isNull=false,isKey=false,isUpdate=true,isReverse=false], EventColumn[index=2,columnType=12,columnName=status,columnValue=test,isNull=false,isKey=false,isUpdate=true,isReverse=false], EventColumn[index=3,columnType=12,columnName=userId,columnValue=dxy_test,isNull=false,isKey=false,isUpdate=true,isReverse=false], EventColumn[index=4,columnType=93,columnName=createTime,columnValue=2019-04-24 02:05:11,isNull=false,isKey=false,isUpdate=true,isReverse=false], EventColumn[index=5,columnType=93,columnName=modifyTime,columnValue=2018-07-19 11:05:48,isNull=false,isKey=false,isUpdate=true,isReverse=false], EventColumn[index=0,columnType=-5,columnName=id,columnValue=390442,isNull=false,isKey=true,isUpdate=true,isReverse=false]]

为啥会先执行delete,然后在执行insert呢?

kuarbcqp

kuarbcqp1#

问题已经查明,我在DbLoadMerger.merge中加入processid合并后的所有日志,代码如下:

public static List<EventData> merge(Identity identity,List<EventData> eventDatas) {
        Map<RowKey, EventData> result = new ConcurrentHashMap<RowKey, EventData>();
        for (EventData eventData : eventDatas) {
            merge(eventData, result);
        }
        logger.info("processId数据:{} result:{}",identity.getProcessId(), result.toString());
        return new LinkedList<EventData>(result.values());
    }

日志如下:

2019-04-25 06:11:55.506 [Otter-Seda-Executor-19] INFO  c.a.o.shared.arbitrate.impl.setl.monitor.PermitMonitor - processId数据:856628 result:{
RowKey{tableId=null, schemaName='otter_db_1', tableName='message', keys=[EventColumn[index=0,columnType=3,columnName=id,columnValue=707594,isNull=false,isKey=true,isUpdate=true,isReverse=false]]}=EventData{tableId=13, tableName='message', schemaName='otter_db', eventType=DELETE, executeTime=1556143906000, oldKeys=[], keys=[EventColumn[index=0,columnType=3,columnName=id,columnValue=707594,isNull=false,isKey=true,isUpdate=true,isReverse=false]], columns=[EventColumn[index=1,columnType=-5,columnName=traceId,columnValue=1640231556143902530,isNull=false,isKey=false,isUpdate=true,isReverse=false], EventColumn[index=2,columnType=12,columnName=status,columnValue=updateId,isNull=false,isKey=false,isUpdate=true,isReverse=false], EventColumn[index=3,columnType=12,columnName=userId,columnValue=707594,isNull=false,isKey=false,isUpdate=true,isReverse=false], EventColumn[index=4,columnType=93,columnName=createTime,columnValue=2019-04-25 06:11:42,isNull=false,isKey=false,isUpdate=true,isReverse=false], EventColumn[index=5,columnType=93,columnName=modifyTime,columnValue=2019-04-25 06:11:45,isNull=false,isKey=false,isUpdate=true,isReverse=false]], size=198, pairId=44, sql='null', ddlSchemaName='null', syncMode=null, syncConsistency=null, remedy=false, hint='null', withoutSchema=false, shardingScheme='otter_db_1', shardingTabele='message', shardingDSDTOList=[]},
RowKey{tableId=null, schemaName='otter_db_1', tableName='message', keys=[EventColumn[index=0,columnType=-5,columnName=id,columnValue=707594,isNull=false,isKey=true,isUpdate=true,isReverse=false]]}=EventData{tableId=13, tableName='message', schemaName='otter_db', eventType=INSERT, executeTime=1556143902000, oldKeys=[], keys=[EventColumn[index=0,columnType=-5,columnName=id,columnValue=707594,isNull=false,isKey=true,isUpdate=true,isReverse=false]], columns=[EventColumn[index=1,columnType=-5,columnName=traceId,columnValue=1640231556143902530,isNull=false,isKey=false,isUpdate=true,isReverse=false], EventColumn[index=2,columnType=12,columnName=status,columnValue=test,isNull=false,isKey=false,isUpdate=true,isReverse=false], EventColumn[index=3,columnType=12,columnName=userId,columnValue=dxy_test,isNull=false,isKey=false,isUpdate=true,isReverse=false], EventColumn[index=4,columnType=93,columnName=createTime,columnValue=2019-04-25 06:11:42,isNull=false,isKey=false,isUpdate=true,isReverse=false], EventColumn[index=5,columnType=93,columnName=modifyTime,columnValue=2018-07-19 11:05:48,isNull=false,isKey=false,isUpdate=true,isReverse=false]], size=76, pairId=44, sql='null', ddlSchemaName='null', syncMode=null, syncConsistency=null, remedy=false, hint='null', withoutSchema=false, shardingScheme='otter_db_1', shardingTabele='message', shardingDSDTOList=[]}
}

从日志中看出,delete语句中columnType=3,但是insert语句中columnType=-5,造成这2个eventdate不是同一pk数据,导致未合并sql,最终执行的时候先执行delete后执行insert造成数据不一致问题。

目前解决方案是重写rowkey中的keys,也就是EventColumn的hashcode和equals:

@Override
    public int hashCode() {
        final int prime = 31;
        int result = 1;
        result = prime * result + ((columnName == null) ? 0 : columnName.hashCode());
        //在批量insert、update pk和delete情况下会存在 cloumntype不一致问题(columnType=3 columnType=-5)
//        result = prime * result + columnType;
        result = prime * result + ((columnValue == null) ? 0 : columnValue.hashCode());
        result = prime * result + index;
        result = prime * result + (isKey ? 1231 : 1237);
        result = prime * result + (isNull ? 1231 : 1237);
        result = prime * result + (isUpdate ? 1231 : 1237);
        result = prime * result + (isReverse ? 1231 : 1237);
        return result;
    }

    @Override
    public boolean equals(Object obj) {
        if (this == obj) return true;
        if (obj == null) return false;
        if (getClass() != obj.getClass()) return false;
        EventColumn other = (EventColumn) obj;
        if (columnName == null) {
            if (other.columnName != null) return false;
        } else if (!columnName.equals(other.columnName)) return false;
//        if (columnType != other.columnType) return false;
        if (columnValue == null) {
            if (other.columnValue != null) return false;
        } else if (!columnValue.equals(other.columnValue)) return false;
        if (index != other.index) return false;
        if (isKey != other.isKey) return false;
        if (isNull != other.isNull) return false;
        if (isUpdate != other.isUpdate) return false;
        if (isReverse != other.isReverse) return false;
        return true;
    }
iyr7buue

iyr7buue2#

你这是哪个版本的

lokaqttq

lokaqttq3#

@ppj19891020 方便提供下MySQL版本、对源端数据所有操作的SQL吗?columnType为什么会变?

ppcbkaq5

ppcbkaq54#

估计是源端执行了什么ddl语句,然后同步没有配置ddl同步

相关问题