我正在尝试使用Debezium引擎和postgresql连接器来监听我的postgres DB更改。配置:
io.debezium.config.Configuration.create()
.with("name", "author-connector")
.with("connector.class", "io.debezium.connector.postgresql.PostgresConnector")
.with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore")
.with("offset.storage.file.filename", "/tmp/offsets.dat")
.with("offset.flush.interval.ms", "60000")
.with("database.hostname", "localhost")
.with("database.port", "5432")
.with("database.user", postgres)
.with("database.password", postgres)
.with("database.dbname", postgres)
.with("database.include.list", postgres)
.with("schema.include.list", "admin")
.with("table.include.list", "admin.connector")
.with("include.schema.changes", "true")
.with("database.server.name", "author-server")
.with("database.history", "io.debezium.relational.history.FileDatabaseHistory")
.with("database.history.file.filename", "/tmp/dbhistory.dat")
.with("plugin.name", "pgoutput")
.with("signal.enabled.channels", "source")
.with("topic.prefix", "admin")
.with("quarkus.http.port", "7001")
.with("signal.data.collection", "admin.signal_table")
.with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore")
.with("offset.storage.file.filename", "/tmp/offsets.dat")
.with("offset.flush.interval.ms", "60000")
.build();
字符串
我正在使用debzium-embedded
version: 2.4.1.Final
。连接能够正确设置。
但是,admin.connector
中的任何更改(INSERT/UPDATE/DELETE
)都无法捕获。
同样,如果我使用信号表尝试增量快照,我会得到以下错误:ractIncrementalSnapshotChangeEventSource : Schema not found for table 'admin.connector', known tables [admin.signal_table]
1条答案
按热度按时间3htmauhk1#
最后我发现我只需要保留
schema.include.list
,它可以让debezium跟踪schema列表中的所有表。在这种情况下,table.include.list
不被接受。