JSON文件:
{
"name": "mysql-jdbc",
"config": {
"connector.class" : "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url" : "jdbc:mysql://mysqldb:3306/silicon",
"connection.user" : "root",
"connection.password" : "root",
"mode" : "incrementing",
"incrementing.column.name": "id",
"query": "select * from (select * from credit_lines where id=2) test;",
"topic.prefix" : "JDBC.test_db_test",
"validate.non.null" : "false",
"poll.interval.ms" : "1000"
}
}
表格数据:
日志中的查询:
connect | (org.apache.kafka.connect.runtime.tracing.TracerConfig)
connect | [2023-03-01 19:42:30,132] INFO Initializing: org.apache.kafka.connect.runtime.TransformationChain{} (org.apache.kafka.connect.runtime.Worker)
connect | [2023-03-01 19:42:30,133] INFO [Worker clientId=connect-1, groupId=jdbc_source_connector] Finished starting connectors and tasks (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
connect | [2023-03-01 19:42:30,133] INFO Starting JDBC source task (io.confluent.connect.jdbc.source.JdbcSourceTask)
connect | [2023-03-01 19:42:30,133] INFO JdbcSourceTaskConfig values:
connect | batch.max.rows = 100
connect | catalog.pattern = null
connect | connection.attempts = 3
connect | connection.backoff.ms = 10000
connect | connection.password = [hidden]
connect | connection.url = jdbc:mysql://mysqldb:3306/silicon
connect | connection.user = root
connect | db.timezone = UTC
connect | dialect.name =
connect | incrementing.column.name = id
connect | mode = incrementing
connect | numeric.mapping = null
connect | numeric.precision.mapping = false
connect | poll.interval.ms = 1000
connect | query = select * from (select * from credit_lines where id=2) test;
connect | query.retry.attempts = -1
connect | query.suffix =
connect | quote.sql.identifiers = ALWAYS
connect | schema.pattern = null
connect | table.blacklist = []
connect | table.monitoring.startup.polling.limit.ms = 10000
connect | table.poll.interval.ms = 60000
connect | table.types = [TABLE]
connect | table.whitelist = []
connect | tables = []
connect | timestamp.column.name = []
connect | timestamp.delay.interval.ms = 0
connect | timestamp.granularity = connect_logical
connect | timestamp.initial = null
connect | topic.prefix = JDBC.test_db_test
connect | transaction.isolation.mode = DEFAULT
connect | validate.non.null = false
connect | (io.confluent.connect.jdbc.source.JdbcSourceTaskConfig)
我收到的错误:
ERROR WorkerSourceTask{id=mysql-jdbc-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
connect | org.apache.kafka.connect.errors.ConnectException: java.sql.SQLSyntaxErrorException: You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near 'WHERE `id` > -1 ORDER BY `id` ASC' at line 1
connect | at io.confluent.connect.jdbc.source.JdbcSourceTask.poll(JdbcSourceTask.java:452)
connect | at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.poll(AbstractWorkerSourceTask.java:470)
connect | at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:349)
connect | at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:201)
connect | at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:256)
connect | at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:75)
connect | at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
connect | at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
connect | at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
connect | at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
connect | at java.base/java.lang.Thread.run(Thread.java:829)
connect | Caused by: java.sql.SQLSyntaxErrorException: You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near 'WHERE `id` > -1 ORDER BY `id` ASC' at line 1
2条答案
按热度按时间qcuzuvrc1#
您只搜索
id=2
,因此mode = incrementing
对于query
实际上没有意义,因为它总是将ORDER BY ID ASC
附加到查询中。如果您总是想搜索
id = 2
,并不断更新该行,那么您可能需要mode = bulk
或者,至少不能用分号结束查询,因为
WHERE id > -1 ORDER BY id ASC
是以递增模式添加的。uidvcgyl2#
从查询中删除分号。之后,它开始工作
工作代码: