无法使用com.mysql.jdbc.exceptions.jdbc4.mysqlsyntaxerrorexception接收对mysql的更改:您的sql语法有错误;

gzszwxb4  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(355)

我使用debezium获取数据事件,kafka connect jdbc接收更改
这是我正在使用的dockerfile mysql-connector-java-5.1.39.jar ```
FROM debezium/connect:1.1
ENV KAFKA_CONNECT_JDBC_DIR=$KAFKA_CONNECT_PLUGINS_DIR/kafka-connect-jdbc

ENV MYSQL_DRIVER_VERSION 5.1.39

ARG KAFKA_JDBC_VERSION=5.3.1

RUN curl -k -SL "https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-${MYSQL_DRIVER_VERSION}.tar.gz"
| tar -xzf - -C /kafka/libs --strip-components=1 mysql-connector-java-5.1.39/mysql-connector-java-${MYSQL_DRIVER_VERSION}-bin.jar

RUN mkdir $KAFKA_CONNECT_JDBC_DIR && cd $KAFKA_CONNECT_JDBC_DIR &&
curl -sO https://packages.confluent.io/maven/io/confluent/kafka-connect-jdbc/$KAFKA_JDBC_VERSION/kafka-connect-jdbc-$KAFKA_JDBC_VERSION.jar

这是我的mysql数据库源代码:

docker run -it --rm --name mysqltes -p 3308:3306
-e MYSQL_ROOT_PASSWORD=debezium
-e MYSQL_USER=mysqluser
-e MYSQL_PASSWORD=mysqlpw
debezium/example-mysql:1.1

这是我的第二个mysql数据库,必须对其进行更改:

docker run -it --rm --name mysql -p 3307:3306
-e MYSQL_ROOT_PASSWORD=debezium
-e MYSQL_USER=mysqluser
-e MYSQL_PASSWORD=mysqlpw
debezium/example-mysql:1.1

这是我的源mysql配置:

curl -i -X POST -H "Accept:application/json"
-H "Content-Type:application/json" 192.168.99.102:8083/connectors/
-d '{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysqltes",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.whitelist": "inventory",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "dbhistory.inventory"
}
}'

这是我的接收器连接器配置

curl -i -X POST -H "Accept:application/json"
-H "Content-Type:application/json" 192.168.99.102:8083/connectors/
-d '{
"name": "inventory-connector-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"connection.url": "jdbc:mysql://mysql:3306/inventory?useSSL=false",
"connection.user": "debezium",
"connection.password": "dbz",
"topics": "dbserver1.inventory.customers",
"auto.create": "true",
"insert.mode": "upsert",
"pk.fields": "id",
"pk.mode": "record_value",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope"
}
}'

我正面临这个问题我不知道为什么要帮我解决这个问题

2020-06-20 13:36:58,299 INFO || Attempting to open connection #1 to MySql [io.confluent.connect.jdbc.util.CachedConnectionProvider]
2020-06-20 13:36:58,352 INFO || JdbcDbWriter Connected [io.confluent.connect.jdbc.sink.JdbcDbWriter]
2020-06-20 13:36:58,415 INFO || Checking MySql dialect for existence of table "dbserver1"."inventory"."customers" [io.confluent.connect.jdbc.dialect.MySqlDatabaseDialect]
2020-06-20 13:36:58,423 INFO || Using MySql dialect table "dbserver1"."inventory"."customers" absent [io.confluent.connect.jdbc.dialect.MySqlDatabaseDialect]
2020-06-20 13:36:58,428 INFO || Creating table with sql: CREATE TABLE dbserver1.inventory.customers (
last_name VARCHAR(256) NOT NULL,
id INT NOT NULL,
first_name VARCHAR(256) NOT NULL,
email VARCHAR(256) NOT NULL,
PRIMARY KEY(id)) [io.confluent.connect.jdbc.sink.DbStructure]
2020-06-20 13:36:58,443 WARN || Create failed, will attempt amend if table already exists [io.confluent.connect.jdbc.sink.DbStructure]
com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near '.customers (
last_name VARCHAR(256) NOT NULL,
id INT NOT NULL,
first_name' at line 1 at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490) at com.mysql.jdbc.Util.handleNewInstance(Util.java:404) at com.mysql.jdbc.Util.getInstance(Util.java:387) at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:942) at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3966) at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3902) at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2526) at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2673) at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2545) at com.mysql.jdbc.StatementImpl.executeUpdateInternal(StatementImpl.java:1540) at com.mysql.jdbc.StatementImpl.executeLargeUpdate(StatementImpl.java:2595) at com.mysql.jdbc.StatementImpl.executeUpdate(StatementImpl.java:1468) at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.applyDdlStatements(GenericDatabaseDialect.java:1076) at io.confluent.connect.jdbc.sink.DbStructure.create(DbStructure.java:93) at io.confluent.connect.jdbc.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:61) at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:121) at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:66) at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:74) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:539) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) 2020-06-20 13:36:58,508 WARN || Write of 4 records failed, remainingRetries=7 [io.confluent.connect.jdbc.sink.JdbcSinkTask] com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near '.customers(last_nameVARCHAR(256) NOT NULL,idINT NOT NULL,first_name' at line 1
at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
at com.mysql.jdbc.Util.handleNewInstance(Util.java:404)
at com.mysql.jdbc.Util.getInstance(Util.java:387)
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:942)
at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3966)
at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3902)
at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2526)
at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2673)
at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2545)
at com.mysql.jdbc.StatementImpl.executeUpdateInternal(StatementImpl.java:1540)
at com.mysql.jdbc.StatementImpl.executeLargeUpdate(StatementImpl.java:2595)
at com.mysql.jdbc.StatementImpl.executeUpdate(StatementImpl.java:1468)
at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.applyDdlStatements(GenericDatabaseDialect.java:1076)
at io.confluent.connect.jdbc.sink.DbStructure.create(DbStructure.java:93)
at io.confluent.connect.jdbc.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:61)
at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:121)
at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:66)
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:74)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:539)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
2020-06-20 13:36:58,562 INFO || Closing connection #1 to MySql [io.confluent.connect.jdbc.util.CachedConnectionProvider]
2020-06-20 13:36:58,582 INFO || Initializing writer using SQL dialect: MySqlDatabaseDialect [io.confluent.connect.jdbc.sink.JdbcSinkTask]
2020-06-20 13:36:58,584 ERROR || WorkerSinkTask{id=inventory-connector-sink-0} RetriableException from SinkTask: [org.apache.kafka.connect.runtime.WorkerSinkTask]
org.apache.kafka.connect.errors.RetriableException: java.sql.SQLException: com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near '.customers (
last_name VARCHAR(256) NOT NULL,
id INT NOT NULL,
`first_name' at line 1

    at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:93)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:539)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)

Caused by: java.sql.SQLException: com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near '.customers (
last_name VARCHAR(256) NOT NULL,
id INT NOT NULL,
`first_name' at line 1

    ... 12 more
ni65a41a

ni65a41a1#

尝试添加 "quote.sql.identifiers": "never" 属性设置为接收器连接器配置。在这种情况下 CREATE TABLE 声明应为:

CREATE TABLE dbserver1.inventory.customers (
  last_name VARCHAR(256) NOT NULL,
  id INT NOT NULL,
  first_name VARCHAR(256) NOT NULL,
  email VARCHAR(256) NOT NULL,
  PRIMARY KEY(id)
)

p、 它看起来像一个解决方法,也许jdbc连接器中有一个bug。

相关问题