java Kafka:无法在MySQL连接器中使用where子句运行查询Kafka

a6b3iqyw  于 2023-03-06  发布在  Java
关注(0)|答案(2)|浏览(183)

JSON文件:

{
  "name": "mysql-jdbc",
  "config": {
    "connector.class" : "io.confluent.connect.jdbc.JdbcSourceConnector",
    "connection.url"  : "jdbc:mysql://mysqldb:3306/silicon",
    "connection.user" : "root",
    "connection.password" : "root",
    "mode"            : "incrementing",
    "incrementing.column.name": "id",
    "query": "select * from (select * from credit_lines where id=2) test;",
    "topic.prefix"    : "JDBC.test_db_test",
    "validate.non.null"       : "false",
    "poll.interval.ms"        : "1000"
  }
}

表格数据:

日志中的查询:

connect  |  (org.apache.kafka.connect.runtime.tracing.TracerConfig)
connect  | [2023-03-01 19:42:30,132] INFO Initializing: org.apache.kafka.connect.runtime.TransformationChain{} (org.apache.kafka.connect.runtime.Worker)
connect  | [2023-03-01 19:42:30,133] INFO [Worker clientId=connect-1, groupId=jdbc_source_connector] Finished starting connectors and tasks (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
connect  | [2023-03-01 19:42:30,133] INFO Starting JDBC source task (io.confluent.connect.jdbc.source.JdbcSourceTask)
connect  | [2023-03-01 19:42:30,133] INFO JdbcSourceTaskConfig values: 
connect  |      batch.max.rows = 100
connect  |      catalog.pattern = null
connect  |      connection.attempts = 3
connect  |      connection.backoff.ms = 10000
connect  |      connection.password = [hidden]
connect  |      connection.url = jdbc:mysql://mysqldb:3306/silicon
connect  |      connection.user = root
connect  |      db.timezone = UTC
connect  |      dialect.name = 
connect  |      incrementing.column.name = id
connect  |      mode = incrementing
connect  |      numeric.mapping = null
connect  |      numeric.precision.mapping = false
connect  |      poll.interval.ms = 1000
connect  |      query = select * from (select * from credit_lines where id=2) test;
connect  |      query.retry.attempts = -1
connect  |      query.suffix = 
connect  |      quote.sql.identifiers = ALWAYS
connect  |      schema.pattern = null
connect  |      table.blacklist = []
connect  |      table.monitoring.startup.polling.limit.ms = 10000
connect  |      table.poll.interval.ms = 60000
connect  |      table.types = [TABLE]
connect  |      table.whitelist = []
connect  |      tables = []
connect  |      timestamp.column.name = []
connect  |      timestamp.delay.interval.ms = 0
connect  |      timestamp.granularity = connect_logical
connect  |      timestamp.initial = null
connect  |      topic.prefix = JDBC.test_db_test
connect  |      transaction.isolation.mode = DEFAULT
connect  |      validate.non.null = false
connect  |  (io.confluent.connect.jdbc.source.JdbcSourceTaskConfig)

我收到的错误:

ERROR WorkerSourceTask{id=mysql-jdbc-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
connect  | org.apache.kafka.connect.errors.ConnectException: java.sql.SQLSyntaxErrorException: You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near 'WHERE `id` > -1 ORDER BY `id` ASC' at line 1
connect  |      at io.confluent.connect.jdbc.source.JdbcSourceTask.poll(JdbcSourceTask.java:452)
connect  |      at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.poll(AbstractWorkerSourceTask.java:470)
connect  |      at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:349)
connect  |      at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:201)
connect  |      at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:256)
connect  |      at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:75)
connect  |      at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
connect  |      at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
connect  |      at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
connect  |      at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
connect  |      at java.base/java.lang.Thread.run(Thread.java:829)
connect  | Caused by: java.sql.SQLSyntaxErrorException: You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near 'WHERE `id` > -1 ORDER BY `id` ASC' at line 1

qcuzuvrc

qcuzuvrc1#

您只搜索id=2,因此mode = incrementing对于query实际上没有意义,因为它总是将ORDER BY ID ASC附加到查询中。
如果您总是想搜索id = 2,并不断更新该行,那么您可能需要mode = bulk
或者,至少不能用分号结束查询,因为WHERE id > -1 ORDER BY id ASC是以递增模式添加的。

uidvcgyl

uidvcgyl2#

从查询中删除分号。之后,它开始工作
工作代码:

{
  "name": "mysql-jdbc",
  "config": {
    "connector.class" : "io.confluent.connect.jdbc.JdbcSourceConnector",
    "connection.url"  : "jdbc:mysql://mysqldb:3306/silicon",
    "connection.user" : "root",
    "connection.password" : "root",
    "mode"            : "incrementing",
    "incrementing.column.name": "id",
    "query": "select * from (select * from credit_lines where id=2) test",
    "topic.prefix"    : "JDBC.test_db_test",
    "validate.non.null"       : "false",
    "poll.interval.ms"        : "1000"
  }
}

相关问题