我是kafka/kafka connect的新手,我遇到了一个关于合流jdbc连接器的问题。目前我正在利用汇合社区docker撰写。
我可以成功地创建一个从mysql数据库读入kafka的源代码。
curl -X POST \
-H "Content-Type: application/json" \
--data '{ "name": "college_mysql_source", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "tasks.max": 1, "connection.url": "jdbc:mysql://host.docker.internal:3306/...", "mode": "timestamp+incrementing", "timestamp.column.name": "updated_on", "topic.prefix": "college_mysql_", "poll.interval.ms": 1000, "table.whitelist": "college" } }' \
http://localhost:8083/connectors
数据按预期进入Kafka,每一列都用avro正确表示。如果我通过cli创建消费者,我可以看到数据是正确的。
{
"id":112525,
"pim_id":{"long":78806},
"college_name":{"string":"Western University of Health Sciences"},
...
}
如果我创建一个简单的jdbc接收器,将数据放入另一个mysql db,一切都会很好:
curl -X POST -H "Content-Type: application/json" \
--data '{"name": "weighted_average_mysql_sink_college", "config": {"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector", "tasks.max":"1", "topics":"college_mysql_college", "connection.url": "jdbc:mysql://host.docker.internal:3306/...", "auto.create": "true", "insert.mode": "insert"}}' \
http://localhost:8083/connectors
我们正确地创建了一个表,新的记录在所有字段(包括id)都正确填充的情况下运行良好。但是,如果我改为创建一个使用插入模式upsert的接收器,我就会开始出错。
curl -X POST -H "Content-Type: application/json" \
--data '{"name": "weighted_average_mysql_sink_college", "config": {"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector", "tasks.max":"1", "topics":"college_mysql_college", "connection.url": "jdbc:mysql://host.docker.internal:3306/...", "auto.create": "true", "insert.mode": "upsert", "pk.mode": "record_key", "pk.fields": "id"}}' \
http://localhost:8083/connectors
这将正确地创建表并正确地建立 id
作为主键,到目前为止还不错,但现在每当它从主题中读取时,我们都会得到一个错误: java.sql.BatchUpdateException: Column 'id' cannot be null
这就是我被困的地方。主题中的数据正确地有一个id字段,如果我没有将该列声明为pk,则该id字段将在id列中使用。我尝试过自己定义表,而不是让sink来创建表,我认为表的创建可能有一些奇怪的问题,但我似乎没有得到完全相同的错误。任何关于这方面的建议或方向将不胜感激,我希望解决办法是简单的,我只是错过了一些明显的,那些有更多经验的将能够指出给我。
谢谢!
1条答案
按热度按时间wfveoks01#
需要设置“pk.mode”:“记录\u值”