星期四快乐!我一直在尝试在Debezium中创建一个连接器Postgres Connector,但我只能在表已经存在于我的MySQL示例中时才能捕获更改,这并不理想。因为那样的话,我就不得不用Python写一个脚本来处理这样的事件,使用已经存在的东西可能比重新发明轮子更容易。我希望能够捕捉到实际连接器中的错误。我偶然发现了这篇博客文章。https://debezium.io/blog/2017/09/25/streaming-to-another-database/和我得到了它在我的本地设置工作,这是伟大的,但唯一的问题是,我想在相反的方向去。(我能够捕获新记录、删除记录和更新记录,如果它们不存在,它也会创建新表和新列)。我想从postgres流,并有连接器插入到mysql中的目标数据库。我试着分别切换jdbc源连接器和接收连接器,但是我没有把新记录从postgres插入mysql。似乎我可以找到人们从mysql插入到postgres的所有地方,但不是另一个方向。这里是我设置的GitHub目录,让mysql-kafka-postgres工作。https://github.com/debezium/debezium-examples/tree/main/unwrap-smt
我尝试了另一种方式,但它似乎正在杀死我的Docker映像,因为我 Boot 时说“无法从bootstrap解析服务器Kafka:9092。servers as DNS resolution failed for Kafka [org.apache.Kafka.clients.ClientUtils]”这是我的源json和汇json。
{
"name": "jdbc-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "postgres.public.customers",
"connection.url": "jdbc:mysql://mysql:3306/inventory",
"connection.user": "debezium",
"connection.password": "dbz",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"auto.create": "true",
"auto.evolve": "true",
"insert.mode": "upsert",
"delete.enabled": "true",
"pk.fields": "id",
"pk.mode": "record_key"
}
}
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"topic.prefix": "psql",
"mode": "bulk",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgresuser",
"database.password": "postgrespw",
"database.dbname": "inventory",
"table.include.list": "public.customers",
"slot.name": "test_slot",
"plugin.name": "wal2json",
"database.server.name": "psql",
"tombstones.on.delete": "true",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "https://[APACHE_KAFKA_HOST]:[SCHEMA_REGISTRY_PORT]",
"key.converter.basic.auth.credentials.source": "USER_INFO",
"key.converter.schema.registry.basic.auth.user.info": "[SCHEMA_REGISTRY_USER]:[SCHEMA_REGISTRY_PASSWORD]",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "https://[APACHE_KAFKA_HOST]:[SCHEMA_REGISTRY_PORT]",
"value.converter.basic.auth.credentials.source": "USER_INFO",
"value.converter.schema.registry.basic.auth.user.info": "[SCHEMA_REGISTRY_USER]:[SCHEMA_REGISTRY_PASSWORD]"
}
}
其他的一切都是一样的,在我所关注的博客上。欢迎任何帮助。
2条答案
按热度按时间iaqfqrcu1#
我相信这里有两个不同的问题:
1.如何处理MySQL中不存在的列JDBC接收器连接器应该有一个名为
auto.create
的标志,如果设置为true
,则允许连接器在表不存在时创建表(auto.evolve
还允许表演化)如果你知道你的PG->Kafka->MySQL管道停止工作会很有趣。
免责声明:我为Aiven工作
kgsdhlau2#
我有一个例子here。我相信这是非常类似的Flume时使用的mysql发送。