我正在尝试为google云pubsub服务配置sink kafka connect。
使用以下命令配置kafka connect:
curl
-X POST
-H 'Content-Type: application/json'
-H 'Accept: application/json' -d '{ "name": "pubsub_test",
"config": { "connector.class": "com.google.pubsub.kafka.sink.CloudPubSubSinkConnector",
"tasks.max": "1",
"topics": "kafka_test_topic",
"cps.topic": "cps_test_topic",
"cps.project": "cps_test_project" } }' http://localhost:8083/connectors
在状态中,我有以下消息:
{"name":"pubsub_test","connector":
{"state":"RUNNING","worker_id":"connect:8083"},
"tasks":[{"state":"FAILED","trace":"org.apache.kafka.common.config.ConfigException:
Unknown configuration 'errors.deadletterqueue.topic.name'\n\tat org.apache.kafka.common.config.AbstractConfig.get(AbstractConfig.java:91)\n\tat org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig.get(ConnectorConfig.java:117)\n\tat org.apache.kafka.connect.runtime.ConnectorConfig.get(ConnectorConfig.java:162)\n\tat org.apache.kafka.common.config.AbstractConfig.getString(AbstractConfig.java:126)\n\tat org.apache.kafka.connect.runtime.Worker.sinkTaskReporters(Worker.java:531)\n\tat org.apache.kafka.connect.runtime.Worker.buildWorkerTask(Worker.java:508)\n\tat org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:451)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:873)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1600(DistributedHerder.java:111)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:888)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:884)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\n","id":0,"worker_id":"connect:8083"}],"type":"sink"}
1条答案
按热度按时间hts6caw31#
看起来这并不是云pub/sub连接器所特有的,而是kafka的一个普遍问题。也许你运行的Kafka版本没有修复程序。