Kafka 如何使用postgres JDBC sink连接器流到mysql连接器

ejk8hzay  于 2023-10-15  发布在  Apache
关注(0)|答案(2)|浏览(157)

星期四快乐!我一直在尝试在Debezium中创建一个连接器Postgres Connector,但我只能在表已经存在于我的MySQL示例中时才能捕获更改,这并不理想。因为那样的话,我就不得不用Python写一个脚本来处理这样的事件,使用已经存在的东西可能比重新发明轮子更容易。我希望能够捕捉到实际连接器中的错误。我偶然发现了这篇博客文章。https://debezium.io/blog/2017/09/25/streaming-to-another-database/和我得到了它在我的本地设置工作,这是伟大的,但唯一的问题是,我想在相反的方向去。(我能够捕获新记录、删除记录和更新记录,如果它们不存在,它也会创建新表和新列)。我想从postgres流,并有连接器插入到mysql中的目标数据库。我试着分别切换jdbc源连接器和接收连接器,但是我没有把新记录从postgres插入mysql。似乎我可以找到人们从mysql插入到postgres的所有地方,但不是另一个方向。这里是我设置的GitHub目录,让mysql-kafka-postgres工作。https://github.com/debezium/debezium-examples/tree/main/unwrap-smt
我尝试了另一种方式,但它似乎正在杀死我的Docker映像,因为我 Boot 时说“无法从bootstrap解析服务器Kafka:9092。servers as DNS resolution failed for Kafka [org.apache.Kafka.clients.ClientUtils]”这是我的源json和汇json。

{
"name": "jdbc-sink",
"config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "tasks.max": "1",
    "topics": "postgres.public.customers",
    "connection.url": "jdbc:mysql://mysql:3306/inventory",
    "connection.user": "debezium",
    "connection.password": "dbz",
    "transforms": "unwrap",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.drop.tombstones": "false",
    "auto.create": "true",
    "auto.evolve": "true",
    "insert.mode": "upsert",
    "delete.enabled": "true",
    "pk.fields": "id",
    "pk.mode": "record_key"
}
}

{
"name": "inventory-connector",
"config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "topic.prefix": "psql",
    "mode": "bulk",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "postgresuser",
    "database.password": "postgrespw",
    "database.dbname": "inventory",
    "table.include.list": "public.customers",
    "slot.name": "test_slot",
    "plugin.name": "wal2json",
    "database.server.name": "psql",
    "tombstones.on.delete": "true",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "https://[APACHE_KAFKA_HOST]:[SCHEMA_REGISTRY_PORT]",
    "key.converter.basic.auth.credentials.source": "USER_INFO",
    "key.converter.schema.registry.basic.auth.user.info": "[SCHEMA_REGISTRY_USER]:[SCHEMA_REGISTRY_PASSWORD]",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "https://[APACHE_KAFKA_HOST]:[SCHEMA_REGISTRY_PORT]",
    "value.converter.basic.auth.credentials.source": "USER_INFO",
    "value.converter.schema.registry.basic.auth.user.info": "[SCHEMA_REGISTRY_USER]:[SCHEMA_REGISTRY_PASSWORD]"
}
}

其他的一切都是一样的,在我所关注的博客上。欢迎任何帮助。

iaqfqrcu

iaqfqrcu1#

我相信这里有两个不同的问题:
1.如何处理MySQL中不存在的列JDBC接收器连接器应该有一个名为auto.create的标志,如果设置为true,则允许连接器在表不存在时创建表(auto.evolve还允许表演化)

  1. PG -> Kafka -> Mysql是可能的,你可以找到一个例子,我写了一段时间前here。示例使用Aiven for PostgreSQL和Aiven for Apache Kafka,但您应该能够调整连接器以在任何类型的PG和Kafka中工作。
    如果你知道你的PG->Kafka->MySQL管道停止工作会很有趣。
    免责声明:我为Aiven工作
kgsdhlau

kgsdhlau2#

我有一个例子here。我相信这是非常类似的Flume时使用的mysql发送。

相关问题