canal 基于触发器的更新操作无法识别到主键,可能导致回放SQL被截断

disbfnqx  于 23天前  发布在  其他
关注(0)|答案(1)|浏览(20)
  • I have searched the issues of this repository and believe that this is not a duplicate.
  • I have checked the FAQ of this repository and believe that this is not a duplicate.

environment

  • canal 1.1.7
  • mysql 5.7.44

Issue Description

使用canal 同步MySQL 到ob,对于单表的update操作同步正常. 如果是基于触发器的更新操作会无法识别到主键, 可能是导致回放SQL被截断的原因

If there is an exception, please attach the exception trace:

# 源端MySQL 单表
CREATE TABLE t2 (
id int(11) primary key not null ,
name varchar(100)
);

#  源端MySQL 基于触发器更新
CREATE TABLE t1 (
id int(11) primary key not null ,
name varchar(100)
);

CREATE TABLE t1_log (
id int(11) primary key not null,
name varchar(100)
);

drop trigger if exists insert_t1_trigger;
drop trigger if exists insert_t1_trigger;
DELIMITER #
CREATE TRIGGER INSERT_T1_TRIGGER AFTER INSERT ON t1 FOR EACH ROW BEGIN
INSERT INTO t1_log(id,name)
VALUES(new.id,new.name);
END;#
DELIMITER ;

drop trigger if exists update_t1_trigger;
drop trigger if exists UPDATE_t1_TRIGGER;
DELIMITER #
CREATE TRIGGER UPDATE_T1_TRIGGER AFTER UPDATE ON t1 FOR EACH ROW BEGIN
UPDATE t1_log SET name=new.name where id=old.id;
END;#
DELIMITER ;

drop trigger if exists delete_t1_trigger;
drop trigger if exists DELETE_t1_TRIGGER;
DELIMITER #

CREATE TRIGGER DELETE_T1_TRIGGER AFTER DELETE ON t1 FOR EACH ROW BEGIN
DELETE FROM t1_log where id = old.id;
END;#
DELIMITER ;

# canal client 端rdb配置
## Mirror schema synchronize config
dataSourceKey: defaultDS
destination: example
groupId: g1
outerAdapterKey: mysql1
concurrent: true
dbMapping:
  mirrorDb: true
  database: testdb
  mapAll: true

# canal client 日志 
2024-01-08 10:02:19.353 [Thread-4] INFO  c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - =============> Subscribe destination: example succeed <=============
2024-01-08 10:02:20.001 [pool-8-thread-1] INFO  c.a.o.canal.client.adapter.logger.LoggerAdapterExample - DML: {"data":null,"database":"testdb","destination":"example","es":1704708127
000,"groupId":"g1","isDdl":true,"old":null,"pkNames":null,"sql":"CREATE TABLE t2 (\nid int(11) primary key not null ,\nname varchar(100)\n)","table":"t2","ts":1704708139908,"type":"C
REATE"}
2024-01-08 10:02:20.002 [pool-8-thread-1] DEBUG c.a.o.c.c.adapter.rdb.service.RdbMirrorDbSyncService - DDL: {"data":null,"database":"testdb","destination":"example","es":170470812700
0,"groupId":"g1","isDdl":true,"old":null,"pkNames":null,"sql":"CREATE TABLE t2 (\nid int(11) primary key not null ,\nname varchar(100)\n)","table":"t2","ts":1704708139908,"type":"CRE
ATE"}
2024-01-08 10:03:06.840 [pool-8-thread-1] INFO  c.a.o.canal.client.adapter.logger.LoggerAdapterExample - DML: {"data":[{"id":1,"name":"ff"}],"database":"testdb","destination":"exampl
e","es":1704708186000,"groupId":"g1","isDdl":false,"old":null,"pkNames":["id"],"sql":"","table":"t2","ts":1704708186835,"type":"INSERT"}
2024-01-08 10:03:06.877 [pool-6-thread-1] DEBUG c.a.o.canal.client.adapter.rdb.service.RdbSyncService - DML: {"data":{"id":1,"name":"ff"},"database":"testdb","destination":"example",
"old":null,"table":"t2","type":"INSERT"}
2024-01-08 10:03:30.949 [pool-8-thread-1] INFO  c.a.o.canal.client.adapter.logger.LoggerAdapterExample - DML: {"data":[{"id":1,"name":"9"}],"database":"testdb","destination":"example
","es":1704708210000,"groupId":"g1","isDdl":false,"old":[{"name":"ff"}],"pkNames":["id"],"sql":"","table":"t2","ts":1704708210949,"type":"UPDATE"}
2024-01-08 10:03:30.962 [pool-6-thread-1] DEBUG c.a.o.canal.client.adapter.rdb.service.RdbSyncService - DML: {"data":{"id":1,"name":"9"},"database":"testdb","destination":"example","
old":{"name":"ff"},"table":"t2","type":"UPDATE"}
2024-01-08 10:05:41.265 [pool-8-thread-1] INFO  c.a.o.canal.client.adapter.logger.LoggerAdapterExample - DML: {"data":[{"id":8,"name":"ho"}],"database":"testdb","destination":"exampl
e","es":1704708340000,"groupId":"g1","isDdl":false,"old":[{"name":"9"}],"pkNames":[],"sql":"","table":"t1","ts":1704708341264,"type":"UPDATE"}
2024-01-08 10:05:41.265 [pool-8-thread-1] INFO  c.a.o.canal.client.adapter.logger.LoggerAdapterExample - DML: {"data":[{"id":8,"name":"ho"}],"database":"testdb","destination":"exampl
e","es":1704708340000,"groupId":"g1","isDdl":false,"old":[{"name":"9"}],"pkNames":[],"sql":"","table":"t1_log","ts":1704708341265,"type":"UPDATE"}
2024-01-08 10:05:41.285 [pool-5-thread-1] ERROR c.a.o.canal.client.adapter.rdb.service.RdbSyncService - Failed to sync single DML: {"data":{"id":8,"name":"ho"},"database":"testdb","d
estination":"example","old":{"name":"9"},"table":"t1","type":"UPDATE"}
2024-01-08 10:05:41.289 [pool-8-thread-1] ERROR c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - java.lang.RuntimeException: java.util.concurrent.ExecutionException: java.l
ang.RuntimeException: java.lang.RuntimeException: com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: You have an error in your SQL syntax; check the manual that corresponds t
o your OceanBase version for the right syntax to use near 'WH' at line 1
java.lang.RuntimeException: java.lang.RuntimeException: java.util.concurrent.ExecutionException: java.lang.RuntimeException: java.lang.RuntimeException: com.mysql.jdbc.exceptions.jdb
c4.MySQLSyntaxErrorException: You have an error in your SQL syntax; check the manual that corresponds to your OceanBase version for the right syntax to use near 'WH' at line 1
        at com.alibaba.otter.canal.client.adapter.rdb.RdbAdapter.sync(RdbAdapter.java:161)
        at com.alibaba.otter.canal.client.adapter.ProxyOuterAdapter.sync(ProxyOuterAdapter.java:42)
        at com.alibaba.otter.canal.adapter.launcher.loader.AdapterProcessor.batchSync(AdapterProcessor.java:139)
        at com.alibaba.otter.canal.adapter.launcher.loader.AdapterProcessor.lambda$null$1(AdapterProcessor.java:97)
        at java.util.concurrent.CopyOnWriteArrayList.forEach(CopyOnWriteArrayList.java:895)

#  基于触发器更新的binlog内容
# at 931
#240108 14:12:29 server id 1  end_log_pos 981 CRC32 0x197cf37b  Table_map: `testdb`.`t1` mapped to number 113
# at 981
#240108 14:12:29 server id 1  end_log_pos 1035 CRC32 0x1d6d386b         Table_map: `testdb`.`t1_log` mapped to number 127
# at 1035
#240108 14:12:29 server id 1  end_log_pos 1090 CRC32 0x14fc93e0         Write_rows: table id 113
#240108 10:05:40 server id 1  end_log_pos 499 CRC32 0xaf5b1a8d  Update_rows: table id 127 flags: STMT_END_F

BINLOG '
9MibZRMBAAAAMgAAAFcBAAAAAHEAAAAAAAEABnRlc3RkYgACdDEAAgMPAmQAApBPBM8=
9MibZRMBAAAANgAAAI0BAAAAAH8AAAAAAAEABnRlc3RkYgAGdDFfbG9nAAIDDwJkAALvCOJq
9MibZR8BAAAAMwAAAMABAAAAAHEAAAAAAAAAAgAC///8CAAAAAE5/AgAAAACaG9UQTyz
9MibZR8BAAAAMwAAAPMBAAAAAH8AAAAAAAEAAgAC///8CAAAAAE5/AgAAAACaG+NGluv
'/*!*/;
### UPDATE `testdb`.`t1`
### WHERE
###   @1=8 /* INT meta=0 nullable=0 is_null=0 */
###   @2='9' /* VARSTRING(100) meta=100 nullable=1 is_null=0 */
### SET
###   @1=8 /* INT meta=0 nullable=0 is_null=0 */
###   @2='ho' /* VARSTRING(100) meta=100 nullable=1 is_null=0 */
### UPDATE `testdb`.`t1_log`
### WHERE
###   @1=8 /* INT meta=0 nullable=0 is_null=0 */
###   @2='9' /* VARSTRING(100) meta=100 nullable=1 is_null=0 */
### SET
###   @1=8 /* INT meta=0 nullable=0 is_null=0 */
###   @2='ho' /* VARSTRING(100) meta=100 nullable=1 is_null=0 */
# at 499
#240108 10:05:40 server id 1  end_log_pos 530 CRC32 0xde6481a6  Xid = 1150
COMMIT/*!*/;
SET @@SESSION.GTID_NEXT= 'AUTOMATIC' /* added by mysqlbinlog */ /*!*/;

# 单表更新的binlog 内容

#240108 10:03:30 server id 1  end_log_pos 871 CRC32 0x56282147  Update_rows: table id 128 flags: STMT_END_F

BINLOG '
csibZRMBAAAAMgAAADQDAAAAAIAAAAAAAAEABnRlc3RkYgACdDIAAgMPAmQAAvea+RY=
csibZR8BAAAAMwAAAGcDAAAAAIAAAAAAAAEAAgAC///8AQAAAAJmZvwBAAAAATlHIShW
'/*!*/;
### UPDATE `testdb`.`t2`
### WHERE
###   @1=1 /* INT meta=0 nullable=0 is_null=0 */
###   @2='ff' /* VARSTRING(100) meta=100 nullable=1 is_null=0 */
### SET
###   @1=1 /* INT meta=0 nullable=0 is_null=0 */
###   @2='9' /* VARSTRING(100) meta=100 nullable=1 is_null=0 */
# at 871
#240108 10:03:30 server id 1  end_log_pos 902 CRC32 0x46f51ccb  Xid = 1139
COMMIT/*!*/;
xytpbqjk

xytpbqjk1#

debug看下生成的DML语句是啥,看着是语法报错

相关问题