我正在尝试使用kafka connect hdfs sink连接器将json数据从kafka移动到hdfs。
即使kafka中的json数据具有模式和负载,kafka connect任务也会失败并出现错误
org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires \"schema\" and \"payload\" fields and may not contain additional fields.
Kafka资料:
./bin/kafka控制台使用者--topic test\u hdfs\u json\u schema\u payload\u 1--zookeeperlocalhost:2181 --from-beginning
{"schema": {"type": "struct","fields": [{"type": "string","optional": false,"field": "Name"}, {"type": "string","optional": false,"field": "company"}],"optional": false,"name": "Person"},"payload": {"Name": "deepak","company": "BT"}}
{"schema": {"type": "struct","fields": [{"type": "string","optional": false,"field": "Name"}, {"type": "string","optional": false,"field": "company"}],"optional": false,"name": "Person"},"payload": {"Name": "sufi","company": "BT"}}
{"schema": {"type": "struct","fields": [{"type": "string","optional": false,"field": "Name"}, {"type": "string","optional": false,"field": "company"}],"optional": false,"name": "Person"},"payload": {"Name": "vikas","company": "BT"}}
已使用以下命令提交hdfs接收器作业:
curl -X POST -H "Content-Type: application/json" --data '{"name": "connect-cluster-15may-308pm", "config": {"connector.class":"io.confluent.connect.hdfs.HdfsSinkConnector", "tasks.max":"2", "hdfs.url": "hdfs://localhost:9000","flush.size": "1","topics":"test_hdfs_json_schema_payload_1","topics.dir":"/deepak/kafka-connect/trial1"}}' http://localhost:8083/connectors
分布式kafka连接工作程序配置:
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
错误消息:
http://localhost:8083/连接器/connect-cluster-15may-308pm/任务/0/状态
{
"state": "FAILED",
"trace": "org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires \"schema\" and \"payload\" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.\n\tat org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:309)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:400)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:249)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:179)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n\tat java.lang.Thread.run(Thread.java:745)\n",
"id": 0,
"worker_id": "127.0.0.1:8083"
}
1条答案
按热度按时间yqyhoc1h1#
您使用的是什么版本的Kafka连接?在从stacktrace确定错误源时,了解这一点很有帮助。
我认为现在的情况是,在值中有数据,但在键中没有。既然你们两个都有
key.converter
以及value.converter
设置为JsonConverter
和schemas.enable=true
,它希望看到包含schema
以及payload
两个都是。不过,我猜你的钥匙null
.这是一种相反的问题https://issues.apache.org/jira/browse/kafka-3832 哪里
JsonConverter
从不产生真实的null
价值观。相反,它总是生成包含预期可选模式+a的信封null
有效载荷。在本例中,从kafka转换到connect的数据api是不起作用的,因为它在密钥中需要相同的信封格式。您可以通过添加
--property print.key=true
到您的控制台使用者命令。如果是打印出来的null
问题是jsonconverter无法解码它们。一个简单的解决方法是只使用一些其他的
Converter
不在乎的钥匙null
值--反正键中没有数据。与Kafka连接的是org.apache.kafka.connect.storage.StringConverter
.