debezium没有为mysql的嵌入式版本提供cdc

s71maibg  于 2021-06-17  发布在  Mysql
关注(0)|答案(1)|浏览(524)

我正在使用以下依赖项,

  1. <dependency>
  2. <groupId>io.debezium</groupId>
  3. <artifactId>debezium-connector-oracle</artifactId>
  4. <version>${version.debezium}</version>
  5. </dependency>
  6. <!-- https://mvnrepository.com/artifact/io.debezium/debezium-connector-mysql -->
  7. <dependency>
  8. <groupId>io.debezium</groupId>
  9. <artifactId>debezium-connector-mysql</artifactId>
  10. <version>${version.debezium}</version>
  11. </dependency>
  12. <version.debezium>0.8.3.Final</version.debezium>

下面是我的java方法,

  1. public void runMysqlParsser() {
  2. Configuration config = Configuration.create()
  3. /* begin engine properties */
  4. .with("connector.class",
  5. "io.debezium.connector.mysql.MySqlConnector")
  6. .with("offset.storage",
  7. "org.apache.kafka.connect.storage.FileOffsetBackingStore")
  8. .with("offset.storage.file.filename",
  9. "/home/mohit/tmp/offset.dat")
  10. .with("offset.flush.interval.ms", 60000)
  11. /* begin connector properties */
  12. .with("name", "my-sql-connector")
  13. .with("database.hostname", "localhost")
  14. .with("database.port", 3306)
  15. .with("database.user", "root")
  16. .with("database.password", "root")
  17. .with("server.id", 1)
  18. .with("database.server.name", "my-app-connector")
  19. .with("database.history",
  20. "io.debezium.relational.history.FileDatabaseHistory")
  21. .with("database.history.file.filename",
  22. "/home/mohit/tmp/dbhistory.dat")
  23. .with("database.whitelist", "mysql")
  24. .with("table.whitelist", "mysql.customers")
  25. .build();
  26. EmbeddedEngine engine = EmbeddedEngine.create()
  27. .using(config)
  28. .notifying(this::handleEvent)
  29. .build();
  30. Executor executor = Executors.newSingleThreadExecutor();
  31. executor.execute(engine);
  32. }
  33. private void handleEvent(SourceRecord sourceRecord) {
  34. try {
  35. LOG.info("Got record :" + sourceRecord.toString());
  36. } catch (Exception ex) {
  37. LOG.info("exception in handle event:" + ex);
  38. }

我的sql配置。

  1. general_log_file = /var/log/mysql/mysql.log
  2. general_log = 1
  3. server-id = 1
  4. log_bin = /var/log/mysql/mysql-bin.log
  5. expire_logs_days = 10
  6. max_binlog_size = 100M
  7. binlog_format = row
  8. binlog_row_image = full
  9. binlog_rows_query_log_events = on
  10. gtid_mode = on
  11. enforce_gtid_consistency = on

当我运行这段代码时,我得到了历史日志的偏移量,mysql.log文件也得到了添加到其中的偏移量。但是,当我对表执行任何update语句时,它不会给我任何日志,即handleevent方法没有被调用。有人能告诉我代码或配置有什么问题吗?
下面是运行java代码后的日志,

  1. $$ java -jar debezium-gcp-1.0-SNAPSHOT-jar-with-dependencies.jar

log4j:warn找不到logger的appender(org.apache.kafka.connect.json.jsonverterconfig)。log4j:warn请正确初始化log4j系统。
log4j:请参阅http://logging.apache.org/log4j/1.2/faq.html#noconfig 更多信息。2018年11月28日下午1:29:47 com.debezium.gcp.samplemysqlembeddedbezium handleevent info:got record:sourcerecord{sourcepartition={server=my app connector},sourceoffset={file=mysql-bin.000002,pos=980,gtids=31b708c7-ee22-11e8-b8a3-080027fbf50e:1-17,snapshot=true}}connectrecord{topic my-app-connector',kafkapartition=0,key=struct{databasename=},value=struct{source=struct{version=0.8.3.final,name=my app connector,server\u id=0,ts\u sec=0,file=mysql bin.000002,pos=980,row=0,snapshot=true},databasename=,ddl=set character\u set\u server=latin1,collation\u server=latin1\u swedish\u ci;},timestamp=null,headers=connectheaders(headers=)}2018年11月28日下午1:29:47 com.github.shyiko.mysql.binlog.binarylogclient连接信息:已连接到localhost:3306 at 31b708c7-ee22-11e8-b8a3-080027fbf50e:1-17(sid:6326, cid:21)

vd8tlhqk

vd8tlhqk1#

您是否在白名单中列出正确的数据库/表?
你能看看这个演示吗-https://github.com/debezium/debezium-examples/tree/master/kinesis 只需删除与动觉相关的代码,只打印到控制台。同时检查 table.ignore.builtin 配置选项。伊姆霍 mysql 数据库属于内置数据库,默认情况下会被过滤掉。

相关问题