我正在尝试创建一个使用protobuf值转换器的kafka接收器连接器。我已经有了一个使用json的配置版本,但是我现在需要将其更改为使用protobuf消息。
我正在尝试使用以下请求创建连接器:
curl -X POST localhost:8083/connectors -H "Content-Type: application/json" -d '
{
"name": "jdbc-sink-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"topics": "TEST_PROTO",
"connection.url": "${DB_URL}",
"value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
"key.converter": "io.confluent.connect.protobuf.ProtobufConverter",
"auto.create": true,
"auto.evolve": true,
"type": "sink",
"connection.user": "{DB_USER}",
"connection.password": "${DB_PASS}"
}
}
这将产生以下错误消息:
Invalid value io.confluent.connect.protobuf.ProtobufConverter for configuration value.converter: Class io.confluent.connect.protobuf.ProtobufConverter could not be found
我不太明白为什么我不能在这里包括这个。据我所见,文档表明这是一个适当的值:https://docs.confluent.io/current/connect/userguide.html
有人能帮忙吗?
1条答案
按热度按时间4szc88ey1#
我猜在这种情况下,您缺少以下配置:
value.converter.schema.registry.url
key.converter.schema.registry.url
key.converter.schemas.enable启用
值.converter.schemas.enable
除此之外,我还尝试使用jdbc的最新jar和confluent平台的最新版本。如果这样不行,请告诉我。