kafka connect hdfs接收器连接器失败,即使json数据包含架构和有效负载字段

vs3odd8k  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(441)

我正在尝试使用kafka connect hdfs sink连接器将json数据从kafka移动到hdfs。
即使kafka中的json数据具有模式和负载,kafka connect任务也会失败并出现错误

org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires \"schema\" and \"payload\" fields and may not contain additional fields.

Kafka资料:
./bin/kafka控制台使用者--topic test\u hdfs\u json\u schema\u payload\u 1--zookeeperlocalhost:2181 --from-beginning

{"schema": {"type": "struct","fields": [{"type": "string","optional": false,"field": "Name"}, {"type": "string","optional": false,"field": "company"}],"optional": false,"name": "Person"},"payload": {"Name": "deepak","company": "BT"}}
{"schema": {"type": "struct","fields": [{"type": "string","optional": false,"field": "Name"}, {"type": "string","optional": false,"field": "company"}],"optional": false,"name": "Person"},"payload": {"Name": "sufi","company": "BT"}}
{"schema": {"type": "struct","fields": [{"type": "string","optional": false,"field": "Name"}, {"type": "string","optional": false,"field": "company"}],"optional": false,"name": "Person"},"payload": {"Name": "vikas","company": "BT"}}

已使用以下命令提交hdfs接收器作业:

curl -X POST -H "Content-Type: application/json" --data '{"name": "connect-cluster-15may-308pm", "config": {"connector.class":"io.confluent.connect.hdfs.HdfsSinkConnector", "tasks.max":"2", "hdfs.url": "hdfs://localhost:9000","flush.size": "1","topics":"test_hdfs_json_schema_payload_1","topics.dir":"/deepak/kafka-connect/trial1"}}' http://localhost:8083/connectors

分布式kafka连接工作程序配置:

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true

错误消息:
http://localhost:8083/连接器/connect-cluster-15may-308pm/任务/0/状态

{
    "state": "FAILED",
    "trace": "org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires \"schema\" and \"payload\" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.\n\tat org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:309)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:400)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:249)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:179)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n\tat java.lang.Thread.run(Thread.java:745)\n",
    "id": 0,
    "worker_id": "127.0.0.1:8083"
}
yqyhoc1h

yqyhoc1h1#

您使用的是什么版本的Kafka连接?在从stacktrace确定错误源时,了解这一点很有帮助。
我认为现在的情况是,在值中有数据,但在键中没有。既然你们两个都有 key.converter 以及 value.converter 设置为 JsonConverterschemas.enable=true ,它希望看到包含 schema 以及 payload 两个都是。不过,我猜你的钥匙 null .
这是一种相反的问题https://issues.apache.org/jira/browse/kafka-3832 哪里 JsonConverter 从不产生真实的 null 价值观。相反,它总是生成包含预期可选模式+a的信封 null 有效载荷。在本例中,从kafka转换到connect的数据api是不起作用的,因为它在密钥中需要相同的信封格式。
您可以通过添加 --property print.key=true 到您的控制台使用者命令。如果是打印出来的 null 问题是jsonconverter无法解码它们。
一个简单的解决方法是只使用一些其他的 Converter 不在乎的钥匙 null 值--反正键中没有数据。与Kafka连接的是 org.apache.kafka.connect.storage.StringConverter .

相关问题