配置apache kafka接收器jdbc连接器

w6lpcovy  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(455)

我想将发送到主题的数据发送到postgresql数据库。因此,我遵循此指南,将属性文件配置为:

name=transaction-sink
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=transactions
connection.url=jdbc:postgresql://localhost:5432/db
connection.user=db-user
connection.password=
auto.create=true
insert.mode=insert
table.name.format=transaction
pk.mode=none

我开始连接

./bin/connect-standalone etc/schema-registry/connect-avro-standalone.properties etc/kafka-connect-jdbc/sink-quickstart-postgresql.properties

接收器连接器已创建,但由于以下错误而未启动:

Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

该模式是avro格式的,并且已注册,我可以向主题发送(生成)消息并从中读取(使用)。但我好像不能把它送到数据库。
这是我的 ./etc/schema-registry/connect-avro-standalone.properties ```
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081

这是一个使用java api提供主题的生产者:

properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
properties.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");

try (KafkaProducer<String, Transaction> producer = new KafkaProducer<>(properties)) {
Transaction transaction = new Transaction();
transaction.setFoo("foo");
transaction.setBar("bar");
UUID uuid = UUID.randomUUID();
final ProducerRecord<String, Transaction> record = new ProducerRecord<>(TOPIC, uuid.toString(), transaction);
producer.send(record);
}

我正在验证数据是否使用

./bin/kafka-avro-console-consumer --bootstrap-server localhost:9092
--property schema.registry.url=http://localhost:8081
--topic transactions
--from-beginning --max-messages 1

数据库已启动并运行。
gojuced7

gojuced71#

这是不正确的:
未知的魔法字节可能是由于一个id字段而不是架构的一部分
这个错误意味着主题上的消息没有使用模式注册表avro serialiser序列化。
你是如何把数据放在这个主题上的?
也许所有的消息都有问题,也许只有一些,但默认情况下,这将停止Kafka连接任务。
你可以设置

"errors.tolerance":"all",

使其忽略无法反序列化的消息。但是如果它们都没有正确的avro序列化,那么这将没有帮助,您需要正确地序列化它们,或者选择一个不同的转换器(例如,如果它们实际上是json,请使用jsonconverter)。
这些参考资料将对您有更多帮助:
https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained
https://www.confluent.io/blog/kafka-connect-deep-dive-error-handling-dead-letter-queues
http://rmoff.dev/ksldn19-kafka-connect
编辑:
如果要序列化密钥 StringSerializer 然后您需要在连接配置中使用此选项:

key.converter=org.apache.kafka.connect.storage.StringConverter

您可以在辅助进程上设置它(全局属性,应用于在其上运行的所有连接器),或仅针对此连接器(即,将其放入连接器属性本身,它将覆盖辅助进程设置)

相关问题