我正在使用debezium Kafka connect将postgresql数据库CDC事件流式传输到Kafka群集。目前,我使用的是1.9.0版,但希望升级到2.4.0版
我们正在使用Kafka发件箱模式将数据库记录流式传输到Kafka队列。
我可以升级到2.4.0,但得到一个奇怪的警告
Unexpected update message received Struct{id=1} and ignored [io.debezium.transforms.outbox.EventRouterDelegate]
字符串
下面是我的配置
{
"value.converter.delegate.converter.type.schemas.enable": "false",
"database.password": "*****",
"database.user": "postgres",
"publication.name": "dbz_outbox_pub",
"slot.name": "debezium2",
"topic.creation.default.cleanup.policy": "compact",
"heartbeat.topic.prefix": "__debezium-heartbeat",
"tasks.max": "1",
"transforms": "outbox",
"topic.creation.default.partitions": "1",
"heartbeat.interval.ms": "60000",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"transforms.outbox.table.field.event.key": "aggregateid",
"publication.autocreate.mode": "filtered",
"topic.creation.default.replication.factor": "1",
"heartbeat.action.query": "CREATE TABLE IF NOT EXISTS debezium_heartbeat (id SERIAL PRIMARY KEY, ts TIMESTAMP WITH TIME ZONE);\nINSERT INTO debezium_heartbeat (id, ts) VALUES (1, NOW()) ON CONFLICT(id) DO UPDATE SET ts=EXCLUDED.ts;",
"plugin.name": "pgoutput",
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
"table.include.list": "public.kafkaoutbox,public.debezium_heartbeat",
"topic.creation.default.compression.type": "uncompressed",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"database.dbname": "superipdev_cellb",
"value.converter.delegate.converter.type": "org.apache.kafka.connect.json.JsonConverter",
"database.port": "5432",
"transforms.outbox.table.field.event.payload": "payload",
"transforms.outbox.route.topic.replacement": "${routedByValue}",
"tombstones.on.delete": "false",
"key.converter.delegate.converter.type.schemas.enable": "false",
"event.processing.failure.handling.mode": "warn",
"schema.include.list": "",
"snapshot.mode": "never",
"database.hostname": "host.docker.internal",
"topic.prefix": "cell_b",
"database.server.name": "cell_b",
"name": "debezium2"
}
型
以前有人遇到过这种情况吗?任何帮助都将不胜感激。
1条答案
按热度按时间x9ybnkn61#
似乎您正在更新发件箱表。Debezium上的SMT
EventRouterDelegate
将忽略除INSERT
之外的所有操作,因为您不应该更新发件箱表中的记录。代码如下