读取kafka主题并使用kafka连接器写入s3

omhiaaxx  于 2021-07-09  发布在  Spark
关注(0)|答案(0)|浏览(335)

我有以下Dataframe模式

root
 |-- sentence: string (nullable = true)
 |-- category: string (nullable = true)

我正在将上述Dataframe成功地写入一个kafka主题(s3.topic)。接下来我想从kafka中读取s3.topic并将数据存储在s3 bucket中。为此,我使用Kafka连接器。我在connect-distributed.properties文件中做了以下配置更改

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

下面是我用来写入s3 bucket的curl命令

curl -X POST \
   localhost:8084/connectors \
  -H 'Content-Type: application/json' \
  -H 'Accept: application/json' \
  -d '{
  "name": "S3SinkConnector",
  "config": {
   "connector.class": "io.confluent.connect.s3.S3SinkConnector",
  "s3.region": "<region>",
  "flush.size": "1000",
  "topics": "s3.topic",
  "tasks.max": "1",
  "aws.secret.access.key": "<key>",
  "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
  "aws.access.key.id": "<keyId>",
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  "storage.class": "io.confluent.connect.s3.storage.S3Storage",
  "s3.bucket.name": "<s3_bucket_name>",
  "key.converter": "org.apache.kafka.connect.storage.StringConverter"
  }
}'

我得到以下错误
error workersinktask{id=s3sinkconnector904-0}转换主题“emoji.analysis009”分区0中偏移量0和时间戳1617585788233处的消息值时出错:jsonconverter with schemas.enable需要“schema”和“payload”字段,并且不能包含其他字段。如果试图反序列化纯json数据,请在转换器配置中设置schemas.enable=false(org.apache.kafka.connect.runtime.workersinktask:547)org.apache.kafka.connect.errors.dataexception:jsonconverter with schemas.enable需要“schema”和“payload”字段,不能包含其他字段。如果试图反序列化纯json数据,请在转换器配置中设置schemas.enable=false。在org.apache.kafka.connect.json.jsonconverter.toconnectdata(jsonconverter。java:370)在org.apache.kafka.connect.storage.converter.toconnectdata(converter。java:87)位于org.apache.kafka.connect.runtime.workersinktask.convertvalue(workersinktask。java:545)在org.apache.kafka.connect.runtime.workersinktask.lambda$convertandtransformrecord$1(workersinktask)。java:501)位于org.apache.kafka.connect.runtime.errors.retrywithtoleranceoperator.execandretry(retrywithtoleranceoperator)。java:156)在org.apache.kafka.connect.runtime.errors.retrywithtoleranceoperator.execandhandleerror(retrywithtoleranceoperator)。java:190)位于org.apache.kafka.connect.runtime.errors.retrywithtoleranceoperator.execute(retrywithtoleranceoperator)。java:132)位于org.apache.kafka.connect.runtime.workersinktask.convertandtransformrecord(workersinktask)。java:501)在org.apache.kafka.connect.runtime.workersinktask.convertmessages(workersinktask)。java:478)在org.apache.kafka.connect.runtime.workersinktask.poll(workersinktask。java:328)在org.apache.kafka.connect.runtime.workersinktask.iteration(workersinktask。java:232)在org.apache.kafka.connect.runtime.workersinktask.execute(workersinktask。java:201)在org.apache.kafka.connect.runtime.workertask.dorun(工作任务。java:185)在org.apache.kafka.connect.runtime.workertask.run(workertask。java:234)在java.util.concurrent.executors$runnableadapter.call(executors。java:511)在java.util.concurrent.futuretask.run(futuretask。java:266)在java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor。java:1149)在java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor。java:624)在java.lang.thread.run(线程。java:748)
如何使用上面显示的printschema将主题数据写入s3 bucket?
更新
下面是写入kafka主题的dataframe的printschema()输出

root
 |-- key: string (nullable = true)
 |-- value: string (nullable = true)

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题