我创建了一个nifi流,它最终将json记录发布为带有avro编码值和字符串键的记录,使用confluent registry中的模式作为值模式。以下是nifi中avrorecordsetwriter的配置。
我现在正在尝试使用Kafka连接( connect-standalone
)使用jdbcsinkconnector将消息移动到postgresql数据库,但出现以下错误:检索id 1的avro架构时出错
我已经确认我的汇合注册表中有一个模式,id为1。下面是我对连接任务的配置
辅助进程配置:
bootstrap.servers=localhost:29092
key.converter.schemas.enable=false
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
offset.storage.file.filename=/tmp/connect.offsets
rest.host.name=localhost
rest.port=8083
plugin.path=share/java
连接器配置:
name=pg-sink
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=rds
connection.url=jdbc:postgresql://localhost:5432/test
connection.user=postgres
connection.password=xxxxxxxx
insert.mode=upsert
table.name.format=test_data
auto.create=true
我在nifi中创建了一个正确使用消息的流,并且通过指定 --property schema.registry.url=http://schema-registry:8081
. 请注意,我在docker容器中运行使用者,这就是url不是localhost的原因。
我不确定我错过了什么。我唯一的想法是我使用了错误的类作为密钥转换器,但是对于给定的错误,这是没有意义的。有人能看出我做错了什么吗?
1条答案
按热度按时间2exbekwf1#
我不太了解nifi,但是我看到模式的名称是“rds”,在错误日志中,它说它没有在模式注册表中找到主题。
Kafka的使用
KafkaAvroSerializer
序列化avro记录,同时在模式注册表中注册相关的avro模式。it使用KafkaAvroDeserializer
反序列化avro记录并从schema注册表检索关联的schema。schema registry将schema存储到名为“subjects”的类别中,为记录命名subject的默认行为是:
topic_name-value
对于值记录和topic_name-key
为了钥匙。在您的例子中,您没有向kafka注册模式,而是向nifi注册模式,因此我猜测名称“rds”将出现在模式注册表中或是模式注册表上的主题名称。
您是如何验证您的架构是否正确存储的?
通常情况下,正确的主题是
rds-value
因为您只在值记录上使用模式注册表。