连接flink和mysql数据库时sql所需的上下文属性不匹配

htzpubme  于 2021-06-21  发布在  Flink
关注(0)|答案(2)|浏览(360)

我正在使用flink最新版本(1.11.2)处理一个示例mysql数据库,该数据库运行良好。
此外,我还向{flink}/lib添加了flink-connector-jdbc2.11-1.11.2、mysql-connector-java-8.0.21.jar、postgresql-42.2.17.jar
这是我的密码

  1. T_CONFIG = TableConfig()
  2. B_EXEC_ENV = ExecutionEnvironment.get_execution_environment()
  3. B_EXEC_ENV.set_parallelism(1)
  4. BT_ENV = BatchTableEnvironment.create(B_EXEC_ENV, T_CONFIG)
  5. ddl = """
  6. CREATE TABLE nba_player4 (
  7. first_name STRING ,
  8. last_name STRING,
  9. email STRING,
  10. id INT
  11. ) WITH (
  12. 'connector' = 'jdbc',
  13. 'url' = 'jdbc:mysql://localhost:3306/inventory',
  14. 'username' = 'root',
  15. 'password' = 'debezium',
  16. 'table-name' = 'customers'
  17. )
  18. """;
  19. BT_ENV.sql_update(ddl)
  20. sinkddl = """
  21. CREATE TABLE print_table (
  22. f0 INT,
  23. f1 INT,
  24. f2 STRING,
  25. f3 DOUBLE
  26. ) WITH (
  27. 'connector' = 'print'
  28. )
  29. """;
  30. BT_ENV.sql_update(sinkddl)
  31. sqlquery("SELECT first_name, last_name FROM nba_player4 ");
  32. BT_ENV.execute("table_job")

但是,在运行代码时,会出现错误

  1. py4j.protocol.Py4JJavaError: An error occurred while calling o23.sqlQuery.
  2. : org.apache.flink.table.api.ValidationException: SQL validation failed. findAndCreateTableSource failed.
  3. Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in
  4. the classpath.
  5. Reason: Required context properties mismatch.
  6. The following properties are requested:
  7. connector=jdbc
  8. password=debezium
  9. schema.0.data-type=VARCHAR(2147483647)
  10. schema.0.name=first_name
  11. schema.1.data-type=VARCHAR(2147483647)
  12. schema.1.name=last_name
  13. schema.2.data-type=VARCHAR(2147483647)
  14. schema.2.name=email
  15. schema.3.data-type=INT
  16. schema.3.name=id
  17. table-name=customers
  18. url=jdbc:mysql://localhost:3306/inventory
  19. username=root
  20. The following factories have been considered:
  21. org.apache.flink.connector.jdbc.table.JdbcTableSourceSinkFactory
  22. org.apache.flink.table.sources.CsvBatchTableSourceFactory
  23. org.apache.flink.table.sources.CsvAppendTableSourceFactory
  24. org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
  25. org.apache.flink.table.filesystem.FileSystemTableFactory

最新的:
这是我的docker yml文件。

  1. version: '2.1'
  2. services:
  3. jobmanager:
  4. build: .
  5. image: flink:latest
  6. hostname: "jobmanager"
  7. expose:
  8. - "6123"
  9. ports:
  10. - "8081:8081"
  11. command: jobmanager
  12. environment:
  13. - JOB_MANAGER_RPC_ADDRESS=jobmanager
  14. taskmanager:
  15. image: flink:latest
  16. expose:
  17. - "6121"
  18. - "6122"
  19. depends_on:
  20. - jobmanager
  21. command: taskmanager
  22. links:
  23. - jobmanager:jobmanager
  24. environment:
  25. - JOB_MANAGER_RPC_ADDRESS=jobmanager
  26. mysql:
  27. image: debezium/example-mysql
  28. ports:
  29. - "3306:3306"
  30. environment:
  31. - MYSQL_ROOT_PASSWORD=debezium
  32. - MYSQL_USER=mysqluser
  33. - MYSQL_PASSWORD=mysqlpw

docker ps命令显示

  1. CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
  2. cf84c84f7821 flink "/docker-entrypoint.…" 2 minutes ago Up 2 minutes 6121-6123/tcp, 8081/tcp _taskmanager_1
  3. 09b19142d70a flink "/docker-entrypoint.…" 9 minutes ago Up 9 minutes 6123/tcp, 0.0.0.0:8081->8081/tcp _jobmanager_1
  4. 4ac01eb11bf7 debezium/example-mysql "docker-entrypoint.s…" 3 days ago Up 9 minutes 0.0.0.0:3306->3306/tcp, 33060/tcp keras-flask-dep

更多信息:
我现在在docker的flink环境是flink:scala_2.12-java8

  1. docker pull flink:scala_2.12-java8

pyflink jdbc连接器是flink 1.11版本的flink-connector-jdbc_2.11-1.11.2.jar。

  1. https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html

为了使用jdbc库,我尝试了两种方法
将flink-connector-jdbc2.11-1.11.2.jar保存到/usr/local/lib/python3.7/site-packages/flink/lib中
在python应用程序中配置类路径

  1. base_dir = "/Users/huhu/Documents/projects/webapp/libs/"
  2. flink_jdbc_jar = f"file://{base_dir}flink-connector-jdbc_2.11-1.11.2.jar"
  3. BT_ENV.get_config().get_configuration().set_string("pipeline.jars",jars)

但仍然得到同样的错误

r6l8ljro

r6l8ljro1#

这也许不能完全回答这个问题,但是:从mysql的Angular 来看,您的 CREATE TABLE 语句不是有效的sql,将引发语法错误。原因是 VARCHAR 数据类型需要长度(即列可以容纳的最大字符数)。
例如:

  1. CREATE TABLE nba_player4 (
  2. first_name VARCHAR(20),
  3. last_name VARCHAR(20),
  4. email VARCHAR(50),
  5. id VARCHAR(10)
  6. );

现在这是有效的mysql代码。不过,我还建议在表中定义一个主键。在数据库设计中,这是一种很好的做法,原因有很多,其中一个原因是能够唯一地标识每个记录:这使得使用 WHERE 子句,或构建引用表的外键约束。一个名为 id 可能是一个很好的候选人-可能会更好地定义为一个自动递增的整数。
所以,玛比:

  1. CREATE TABLE nba_player4 (
  2. first_name VARCHAR(20),
  3. last_name VARCHAR(20),
  4. email VARCHAR(50),
  5. id INT PRIMARY KEY AUTO_INCREMENT
  6. );
展开查看全部
pobjuy32

pobjuy322#

您能验证您使用的所有组件版本吗。很可能您没有使用1.9版本的flink,因为我看到它生成了一种新的数据类型属性格式,这是在以后的版本中引入的。
在flink 1.9中(至少在我选中的1.9.3中是这样),属性的格式应该是: schema.#.type ,而你的情况是 schema.#.data-type .
我建议要么升级到最新的flink版本,要么至少确保使用相同版本的所有组件。

相关问题