kafka/pubsub连接器:示例管道:错误任务转换字节[],无法识别的标记,应为('true'、'false'或'null')

rjjhvcjd  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(493)

我正在使用 kafka_2.11-0.10.2.1 以及谷歌提供的pubsub连接器。我只想用一个独立的连接器将数据从Kafka主题推送到pubsub主题。我按照我应该遵循的所有步骤:
生产 cps-kafka-connector.jar 添加了 cps-sink-connector.properties Kafka的档案 config 目录。文件如下所示:

name=CPSConnector
connector.class=com.google.pubsub.kafka.sink.CloudPubSubSinkConnector
tasks.max=10
topics=kafka_topic
cps.topic=pubsub_topic
cps.project=my_gcp_project_12345

我确保没有启用任何转换器 connect-standalone.properties :

key.converter.schemas.enable=false
value.converter.schemas.enable=false

我创建了一个主题 kafka_topic 并发送了如下消息:

$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic kafka_topic
$ hello streams
$ kafka streams rock

我按如下方式运行连接器:

$ bin/connect-standalone.sh config/connect-standalone.properties config/cps-sink-connector.properties

目的是为了逃跑:

$ gcloud beta pubsub subscriptions pull subscription_to_pubsub_topic

收集这些信息。但是,发生了以下错误,我无法绕过它们。有什么想法吗?我使用了错误的输入吗?正确的样本输入是什么?

[2017-05-04 17:34:40,898] INFO Discovered coordinator 10.33.19.146:9092 (id: 2147483647 rack: null) for group connect-CPSConnector. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:586)
  [2017-05-04 17:34:40,899] INFO Revoking previously assigned partitions [] for group connect-CPSConnector (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:397)
  [2017-05-04 17:34:40,900] INFO (Re-)joining group connect-CPSConnector (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:420)
  [2017-05-04 17:34:40,936] ERROR Task CPSConnector-4 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:141)
  org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
    at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:305)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:401)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:249)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:179)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
  Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'hello': was expecting ('true', 'false' or 'null')
   at [Source: [B@3c06c37d; line: 1, column: 11]
  Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'hello': was expecting ('true', 'false' or 'null')
   at [Source: [B@3c06c37d; line: 1, column: 11]
    at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1702)
    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:558)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3524)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2686)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:878)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:772)
    at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3834)
    at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3783)
    at com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2404)
    at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:50)
    at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:303)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:401)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:249)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:179)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
  [2017-05-04 17:34:40,941] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:142)
  [2017-05-04 17:34:43,837] INFO Revoking previously assigned partitions [] for group connect-CPSConnector (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:397)
  [2017-05-04 17:34:43,838] INFO (Re-)joining group connect-CPSConnector (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:420)
  [2017-05-04 17:34:43,846] INFO Successfully joined group connect-CPSConnector with generation 3 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:388)
  [2017-05-04 17:34:43,846] INFO Successfully joined group connect-CPSConnector with generation 3 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:388)
  [2017-05-04 17:34:43,847] INFO Setting newly assigned partitions [] for group connect-CPSConnector (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:256)
  [2017-05-04 17:34:43,847] INFO Successfully joined group connect-CPSConnector with generation 3 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:388)
  [2017-05-04 17:34:43,848] INFO Setting newly assigned partitions [] for group connect-CPSConnector (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:256)
  [2017-05-04 17:34:43,850] INFO Successfully joined group connect-CPSConnector with generation 3 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:388)
  [2017-05-04 17:34:43,851] INFO Successfully joined group connect-CPSConnector with generation 3 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:388)
  [2017-05-04 17:34:43,851] INFO Successfully joined group connect-CPSConnector with generation 3 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:388)
  [2017-05-04 17:34:43,851] INFO Successfully joined group connect-CPSConnector with generation 3 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:388)
  [2017-05-04 17:34:43,853] INFO Setting newly assigned partitions [] for group connect-CPSConnector (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:256)
  [2017-05-04 17:34:43,846] INFO Successfully joined group connect-CPSConnector with generation 3 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:388)
  [2017-05-04 17:34:43,851] INFO Setting newly assigned partitions [test8-0] for group connect-CPSConnector (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:256)
  [2017-05-04 17:34:43,851] INFO Setting newly assigned partitions [] for group connect-CPSConnector (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:256)
  [2017-05-04 17:34:43,851] INFO Setting newly assigned partitions [] for group connect-CPSConnector (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:256)
  [2017-05-04 17:34:43,850] INFO Successfully joined group connect-CPSConnector with generation 3 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:388)
  [2017-05-04 17:34:43,856] INFO Setting newly assigned partitions [] for group connect-CPSConnector (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:256)
  [2017-05-04 17:34:43,846] INFO Setting newly assigned partitions [] for group connect-CPSConnector (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:256)
  [2017-05-04 17:34:43,854] INFO Setting newly assigned partitions [] for group connect-CPSConnector (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:256)
  [2017-05-04 17:34:43,862] ERROR Task CPSConnector-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:141)
  org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
    at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:305)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:401)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:249)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:179)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
  Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'hello': was expecting ('true', 'false' or 'null')
   at [Source: [B@32a6e3e6; line: 1, column: 11]
  Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'hello': was expecting ('true', 'false' or 'null')
   at [Source: [B@32a6e3e6; line: 1, column: 11]
    at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1702)
    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:558)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3524)
    ...
vqlkdk9b

vqlkdk9b1#

这些线 connect-standalone.properties 不要禁用转换器:

key.converter.schemas.enable=false
value.converter.schemas.enable=false

它们禁止包含带有特定转换器(如json转换器)的模式。您感兴趣的线路有:

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

这个 key.converter 以及 value.converter 字段分别指示Kafka消息的键和值中的数据格式。由于您发布的消息不是有效的json,因此会看到此错误。您需要将这些转换器设置为stringconverter:

key.converter=org.apache.kafka.connect.storage.StringConverter 
value.converter=org.apache.kafka.connect.storage.StringConverter

相关问题