apache-kafka 在KsqlDB中配置从MySQL数据库到PostgreSQL数据库的JDBC接收器连接器

yvfmudvl  于 2022-11-01  发布在  Apache
关注(0)|答案(1)|浏览(186)

我想从MySQL数据库复制一个表到PostgreSQL。我有KsqlDB作为一个流处理器。开始,我只想从源的“库存”数据库复制一个简单的表到汇数据库(PostgreSQL)。下面是库存数据库的结构:

mysql> show tables;
+---------------------+
| Tables_in_inventory |
+---------------------+
| addresses           |
| customers           |
| geom                |
| orders              |
| products            |
| products_on_hand    |
+---------------------+

我已经登录到KsqlDB,并使用以下配置注册了一个源连接器

CREATE SOURCE CONNECTOR inventory_connector WITH (
'connector.class' = 'io.debezium.connector.mysql.MySqlConnector',
'database.hostname' = 'mysql',
'database.port' = '3306',
'database.user' = 'debezium',
'database.password' = 'dbz',
'database.allowPublicKeyRetrieval' = 'true',
'database.server.id' = '223344',
'database.server.name' = 'dbserver',
'database.whitelist' = 'inventory',
'database.history.kafka.bootstrap.servers' = 'broker:9092',
'database.history.kafka.topic' = 'schema-changes.inventory',

'transforms' = 'unwrap',
'transforms.unwrap.type'= 'io.debezium.transforms.UnwrapFromEnvelope',     
'key.converter'= 'org.apache.kafka.connect.json.JsonConverter',
'key.converter.schemas.enable'= 'false',
'value.converter'= 'org.apache.kafka.connect.json.JsonConverter',
'value.converter.schemas.enable'= 'false'
);

以下是创建的主题

ksql> LIST TOPICS;

 Kafka Topic                         | Partitions | Partition Replicas
-----------------------------------------------------------------------
 _ksql-connect-configs               | 1          | 1
 _ksql-connect-offsets               | 25         | 1
 _ksql-connect-statuses              | 5          | 1
 dbserver                            | 1          | 1
 dbserver.inventory.addresses        | 1          | 1

**dbserver.inventory.customers**      | 1          | 1

 dbserver.inventory.geom             | 1          | 1
 dbserver.inventory.orders           | 1          | 1
 dbserver.inventory.products         | 1          | 1
 dbserver.inventory.products_on_hand | 1          | 1
 default_ksql_processing_log         | 1          | 1
 schema-changes.inventory            | 1          | 1
-----------------------------------------------------------------------

现在我只需要将'dbserver.inventory.customers'的内容复制到PostgreSQL数据库中。

ksql> PRINT 'dbserver.inventory.customers' FROM BEGINNING;
Key format: JSON or HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 2022/08/29 02:39:20.772 Z, key: {"id":1001}, value: {"id":1001,"first_name":"Sally","last_name":"Thomas","email":"sally.thomas@acme.com"}, partition: 0
rowtime: 2022/08/29 02:39:20.773 Z, key: {"id":1002}, value: {"id":1002,"first_name":"George","last_name":"Bailey","email":"gbailey@foobar.com"}, partition: 0
rowtime: 2022/08/29 02:39:20.773 Z, key: {"id":1003}, value: {"id":1003,"first_name":"Edward","last_name":"Walker","email":"ed@walker.com"}, partition: 0
rowtime: 2022/08/29 02:39:20.773 Z, key: {"id":1004}, value: {"id":1004,"first_name":"Anne","last_name":"Kretchmar","email":"annek@noanswer.org"}, partition: 0

我已经尝试了接收器连接器的以下配置:

CREATE SINK CONNECTOR postgres_sink WITH (
        'connector.class'= 'io.confluent.connect.jdbc.JdbcSinkConnector',
        'connection.url'= 'jdbc:postgresql://postgres:5432/inventory',
        'connection.user' = 'postgresuser',
        'connection.password' = 'postgrespw',
        'topics'= 'dbserver.inventory.customers',
        'transforms'= 'unwrap',
        'transforms.unwrap.type'= 'io.debezium.transforms.ExtractNewRecordState',
        'transforms.unwrap.drop.tombstones'= 'false',    
        'key.converter'= 'org.apache.kafka.connect.json.JsonConverter',
        'key.converter.schemas.enable'= 'false',
        'value.converter'= 'org.apache.kafka.connect.json.JsonConverter',
        'value.converter.schemas.enable'= 'false',
        'auto.create'= 'true',
        'insert.mode'= 'upsert',
        'auto.evolve' = 'true',
        'table.name.format' = '${topic}',
        'pk.mode'   = 'record_key',
        'pk.fields' =  'id',
        'delete.enabled'= 'true'
);

它会建立连接器,但会显示下列错误:

ksqldb-server      | Caused by: org.apache.kafka.connect.errors.ConnectException: Sink connector 'POSTGRES_SINK' is configured with 'delete.enabled=true' and 'pk.mode=record_key' and therefore requires records with a non-null key and non-null Struct or primitive key schema, but found record at (topic='dbserver.inventory.customers',partition=0,offset=0,timestamp=1661740760772) with a HashMap key and null key schema.

要将这些数据复制到PostgreSQL,Sink连接器的配置应该是什么?我也尝试过先在AVRO中创建一个流,然后使用AVRO Key,Value converter,但它没有工作。我认为这与使用正确的SMT有关,但我不确定。
我的最终目标是连接不同的流,然后将其存储在PostgreSQL中,作为实现CQRS架构的一部分。因此,如果有人能共享一个框架,我可以在这种情况下使用,这将是非常有用的。

8hhllhi2

8hhllhi21#

正如错误所述,键必须是一个原语,而不是JSON对象,也不是Avro。
从所示的JSON中,您需要对键进行提取字段转换

transforms=getKey,unwrap
transforms.getKey.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.getKey.field=id

或者,您可以将源连接器更改为使用IntegerConverter,而不是使用JSONConverter作为键
Debezium也有一篇旧的博客文章介绍了这个确切的用例-https://debezium.io/blog/2017/09/25/streaming-to-another-database/

相关问题