kafka connect jdbc sink to oracle任务失败

c9qzyr3d  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(623)

我正在尝试将kafkaavro主题中的数据接收到我之前创建的现有oracle数据库表中。我在分布式模式下运行kafka connect(3个worker)。当我通过rest提交一个新的连接器时,它会创建一个连接器,一个新任务,但任务立即失败。不明白为什么?下面是任务错误和我的配置。
任务错误

  1. {"state":"FAILED","trace":"org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
  2. at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:457)
  3. at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:251)
  4. at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:180)
  5. at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
  6. at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
  7. at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
  8. at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
  9. at java.util.concurrent.FutureTask.run(Unknown Source)
  10. at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
  11. at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
  12. at java.lang.Thread.run(Unknown Source)","id":0,"worker_id":"kafka_host02:8083"}

Kafka连接配置

  1. bootstrap.servers=kafka_host01:9092,kafka_host02:9092,kafka_host03:9092
  2. group.id=connect-cluster-00
  3. key.converter=io.confluent.connect.avro.AvroConverter
  4. key.converter.schema.registry.url=http://kafka_host01:8081
  5. value.converter=io.confluent.connect.avro.AvroConverter
  6. value.converter.schema.registry.url=http://kafka_host01:8081
  7. key.converter.schemas.enable=true
  8. value.converter.schemas.enable=true
  9. internal.key.converter=org.apache.kafka.connect.json.JsonConverter
  10. internal.value.converter=org.apache.kafka.connect.json.JsonConverter
  11. internal.key.converter.schemas.enable=false
  12. internal.value.converter.schemas.enable=false
  13. offset.storage.topic=connect-offsets
  14. offset.storage.replication.factor=3
  15. config.storage.topic=connect-configs
  16. config.storage.replication.factor=3
  17. status.storage.topic=connect-status
  18. status.storage.replication.factor=3
  19. offset.flush.interval.ms=10000
  20. rest.port=8083

连接器提交命令

  1. curl -X POST -H "Content-Type: application/json" --data '{"name": "heat-predict-ora-sink", "config": {"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector", "tasks.max":"1", "topics":"MY-TEST-02_topic", "connection.user":"scott", "connection.password":"tiger", "connection.url":"jdbc:oracle:thin:@orahost.localdomain:1521:orcl", "key.converter":"io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url":"http://localhost:8081", "value.converter":"io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url":"http://localhost:8081", "insert.mode":"insert", "batch.size":"0", "table.name.format":"TEST.MY_TABLE_IN", "pk.mode":"none", "pk.fields":"none" }}' http://localhost:8083/connectors

请告诉我为什么会这样?

ruarlubt

ruarlubt1#

基于https://github.com/confluentinc/kafka-connect-jdbc/issues/221 我建议您作为要向其写入数据的模式的用户连接到oracle,而不是指定 schema.table

hjqgdpho

hjqgdpho2#

当我允许kafkaconnect用户在自己的模式中自动创建一个表时,我终于成功地写入了rdbms。奇怪的是,新表中的字段名用引号括起来,但它不是一个止动符。感谢所有试图帮助回答这个问题的人!

相关问题