以下是接收器连接器属性
"name": "sink-testtopic-crdb",
"config": {
"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:postgresql://roach-single:26257/sampledb?useSSL=false&reWriteBatchedInserts=true",
"topics": "testtopic",
"tasks.max": "1",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081",
"key.converter.schemas.enable": false,
"value.converter.schemas.enable": true,
"config.action.reload": "restart",
"errors.log.enable": "true",
"errors.log.include.messages": "true",
"print.key": "true",
"errors.tolerance": "all",
"connection.user": "root",
"connection.password": "",
"table.name.format": "schema_sample.sample_test",
"auto.create": true,
"auto.evolve": true,
"insert.mode": "insert",
"pk.mode": "none",
"pk.fields": "none",
"batch.size": 1,
"consumer.override.max.poll.records": 1}
下面是在模式注册表中为“testtopic”注册的模式,将其发布到URL“http://localhost:8081/subjects/testtopic-value/versions”
{
"schema": "{\"type\":\"record\",\"name\":\"sampleTest\",\"fields\":[{\"name\":\"testid\",\"type\":\"string\"},{\"name\":\"testname\",\"type\":\"string\"}]}",
"compatibility": "BACKWARD"}
以下是我插入到Kafka主题的消息
{"testid": "226", "testname": "testSample21"}
消息仍然没有被摄取到ksql db,在Kafka连接日志中,我可以看到下面的错误
'''执行阶段'VALUE_CONVERTER',类为'io.confluent.connect.avro. AvroConverter ',其中使用的记录为{topic =' testtopic ',partition=0,offset=16,timestamp=1691668835004,timestampType= TimeTime}。(org.apache.Kafka.connect.runtime.errors.LogReporter:66)org.apache.Kafka.connect.errors.DataException:无法将主题testtopic的数据反序列化到Avro:at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:148)
1条答案
按热度按时间xuo3flqw1#
总结评论...
使用
kafka-avro-console-producer
或类似的工具来实际生产Avro。正如问题中所写的,您只生成了JSON字符串。这不适用于AvroConverter
。您也可以使用JSONConverter
,但需要修改生成的数据,使其包含schema
作为记录的一部分。