我用java生成数据,然后将其放入kafka主题,然后我希望将这些数据放入mongodb。当我通过java以json的形式发送数据时,由于这个错误,它不会存储到mongodb中。
[2020-08-15 18:42:19,164] ERROR WorkerSinkTask{id=Kafka_ops-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: JSON reader was expecting a value but found 'siqdj'. (org.apache.kafka.connect.runtime.WorkerSinkTask)
org.bson.json.JsonParseException: JSON reader was expecting a value but found 'siqdj'.
at org.bson.json.JsonReader.readBsonType(JsonReader.java:270)
at org.bson.AbstractBsonReader.verifyBSONType(AbstractBsonReader.java:680)
at org.bson.AbstractBsonReader.checkPreconditions(AbstractBsonReader.java:722)
at org.bson.AbstractBsonReader.readStartDocument(AbstractBsonReader.java:450)
at org.bson.codecs.BsonDocumentCodec.decode(BsonDocumentCodec.java:81)
at org.bson.BsonDocument.parse(BsonDocument.java:62)
at com.mongodb.kafka.connect.sink.converter.JsonRawStringRecordConverter.convert(JsonRawStringRecordConverter.java:34)
at com.mongodb.kafka.connect.sink.converter.SinkConverter.convert(SinkConverter.java:44)
at com.mongodb.kafka.connect.sink.MongoSinkTask.lambda$buildWriteModel$6(MongoSinkTask.java:229)
at java.util.ArrayList.forEach(Unknown Source)
at com.mongodb.kafka.connect.sink.MongoSinkTask.buildWriteModel(MongoSinkTask.java:228)
at com.mongodb.kafka.connect.sink.MongoSinkTask.processSinkRecords(MongoSinkTask.java:169)
at com.mongodb.kafka.connect.sink.MongoSinkTask.lambda$put$2(MongoSinkTask.java:117)
at java.util.ArrayList.forEach(Unknown Source)
at com.mongodb.kafka.connect.sink.MongoSinkTask.lambda$put$3(MongoSinkTask.java:116)
at java.util.HashMap.forEach(Unknown Source)
at com.mongodb.kafka.connect.sink.MongoSinkTask.put(MongoSinkTask.java:114)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:560)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.util.concurrent.FutureTask.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
[2020-08-15 18:42:19,166] ERROR WorkerSinkTask{id=Kafka_ops-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:588)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.util.concurrent.FutureTask.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
Caused by: org.bson.json.JsonParseException: JSON reader was expecting a value but found 'siqdj'.
at org.bson.json.JsonReader.readBsonType(JsonReader.java:270)
at org.bson.AbstractBsonReader.verifyBSONType(AbstractBsonReader.java:680)
at org.bson.AbstractBsonReader.checkPreconditions(AbstractBsonReader.java:722)
at org.bson.AbstractBsonReader.readStartDocument(AbstractBsonReader.java:450)
at org.bson.codecs.BsonDocumentCodec.decode(BsonDocumentCodec.java:81)
at org.bson.BsonDocument.parse(BsonDocument.java:62)
at com.mongodb.kafka.connect.sink.converter.JsonRawStringRecordConverter.convert(JsonRawStringRecordConverter.java:34)
at com.mongodb.kafka.connect.sink.converter.SinkConverter.convert(SinkConverter.java:44)
at com.mongodb.kafka.connect.sink.MongoSinkTask.lambda$buildWriteModel$6(MongoSinkTask.java:229)
at java.util.ArrayList.forEach(Unknown Source)
at com.mongodb.kafka.connect.sink.MongoSinkTask.buildWriteModel(MongoSinkTask.java:228)
at com.mongodb.kafka.connect.sink.MongoSinkTask.processSinkRecords(MongoSinkTask.java:169)
at com.mongodb.kafka.connect.sink.MongoSinkTask.lambda$put$2(MongoSinkTask.java:117)
at java.util.ArrayList.forEach(Unknown Source)
at com.mongodb.kafka.connect.sink.MongoSinkTask.lambda$put$3(MongoSinkTask.java:116)
at java.util.HashMap.forEach(Unknown Source)
at com.mongodb.kafka.connect.sink.MongoSinkTask.put(MongoSinkTask.java:114)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:560)
... 10 more
这是我通过kafka consumer中的java程序发送的数据。
{"name":"This is a test","dept":"siqdj","studentId":1}
{"name":"This is another","dept":"siqdj","studentId":2}
每行代表一条记录
这是我的配置文件
connect-standalone.properties属性
bootstrap.servers=localhost:9092
# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000
plugin.path=/plugins
mongosinkconnector.properties属性
name=Kafka_ops
connector.class=com.mongodb.kafka.connect.MongoSinkConnector
tasks.max=1
topics=TestTopic4
connection.uri=mongodb://mongo1:27017,mongo2:27017,mongo3:27017
database=student_kafka
collection=students
key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
1条答案
按热度按时间af7jpaap1#
塔里克-我不是这方面的Maven。但我也尝试过用jdbc sink adapter和oracle数据库做类似的事情。您发送到主题的数据格式在我看来不正确。因此,您可能会得到错误。由于您使用的是jsonconverter,因此主题中的每一行都应采用以下格式,以便接收器适配器解析和写入数据存储。当前您的数据在有效负载中没有架构。因此出现了错误。请将下面的内容传递给主题,看看它是否会沉入mongodb。
{“schema”:{“type”:“struct”,“fields”:[{“type”:“string”,“optional”:false,“field”:“name”},{“type”:“string”,“optional”:true,“field”:“dept”},{“type”:“int64”,“optional”:true,“field”:“studentid”}],“optional”:false,“name”:“your\u table\u name”},“payload”:{“name”:“这是一个测试”,“dept”:“siqdj”,“studentid”:1}