我有以下Dataframe模式
root
|-- sentence: string (nullable = true)
|-- category: string (nullable = true)
我正在将上述Dataframe成功地写入一个kafka主题(s3.topic)。接下来我想从kafka中读取s3.topic并将数据存储在s3 bucket中。为此,我使用Kafka连接器。我在connect-distributed.properties文件中做了以下配置更改
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
下面是我用来写入s3 bucket的curl命令
curl -X POST \
localhost:8084/connectors \
-H 'Content-Type: application/json' \
-H 'Accept: application/json' \
-d '{
"name": "S3SinkConnector",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"s3.region": "<region>",
"flush.size": "1000",
"topics": "s3.topic",
"tasks.max": "1",
"aws.secret.access.key": "<key>",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"aws.access.key.id": "<keyId>",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"s3.bucket.name": "<s3_bucket_name>",
"key.converter": "org.apache.kafka.connect.storage.StringConverter"
}
}'
我得到以下错误
error workersinktask{id=s3sinkconnector904-0}转换主题“emoji.analysis009”分区0中偏移量0和时间戳1617585788233处的消息值时出错:jsonconverter with schemas.enable需要“schema”和“payload”字段,并且不能包含其他字段。如果试图反序列化纯json数据,请在转换器配置中设置schemas.enable=false(org.apache.kafka.connect.runtime.workersinktask:547)org.apache.kafka.connect.errors.dataexception:jsonconverter with schemas.enable需要“schema”和“payload”字段,不能包含其他字段。如果试图反序列化纯json数据,请在转换器配置中设置schemas.enable=false。在org.apache.kafka.connect.json.jsonconverter.toconnectdata(jsonconverter。java:370)在org.apache.kafka.connect.storage.converter.toconnectdata(converter。java:87)位于org.apache.kafka.connect.runtime.workersinktask.convertvalue(workersinktask。java:545)在org.apache.kafka.connect.runtime.workersinktask.lambda$convertandtransformrecord$1(workersinktask)。java:501)位于org.apache.kafka.connect.runtime.errors.retrywithtoleranceoperator.execandretry(retrywithtoleranceoperator)。java:156)在org.apache.kafka.connect.runtime.errors.retrywithtoleranceoperator.execandhandleerror(retrywithtoleranceoperator)。java:190)位于org.apache.kafka.connect.runtime.errors.retrywithtoleranceoperator.execute(retrywithtoleranceoperator)。java:132)位于org.apache.kafka.connect.runtime.workersinktask.convertandtransformrecord(workersinktask)。java:501)在org.apache.kafka.connect.runtime.workersinktask.convertmessages(workersinktask)。java:478)在org.apache.kafka.connect.runtime.workersinktask.poll(workersinktask。java:328)在org.apache.kafka.connect.runtime.workersinktask.iteration(workersinktask。java:232)在org.apache.kafka.connect.runtime.workersinktask.execute(workersinktask。java:201)在org.apache.kafka.connect.runtime.workertask.dorun(工作任务。java:185)在org.apache.kafka.connect.runtime.workertask.run(workertask。java:234)在java.util.concurrent.executors$runnableadapter.call(executors。java:511)在java.util.concurrent.futuretask.run(futuretask。java:266)在java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor。java:1149)在java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor。java:624)在java.lang.thread.run(线程。java:748)
如何使用上面显示的printschema将主题数据写入s3 bucket?
更新
下面是写入kafka主题的dataframe的printschema()输出
root
|-- key: string (nullable = true)
|-- value: string (nullable = true)
暂无答案!
目前还没有任何答案,快来回答吧!