flink JDBCupsertTableLink不更新mysql数据或覆盖旧数据

mfuanj7w  于 2021-06-21  发布在  Flink
关注(0)|答案(0)|浏览(576)

我想问一下我什么时候用 JdbcUpsertTableSink 为了将数据写入mysql,数据没有更新,我设置了mysql的主键,等等。
我使用的是flink 1.11版
我从hive读取数据并写入mysql
下面是我的代码

  1. public class TmpTableExample {
  2. public static void main(String[] args) throws IOException {
  3. ParameterToolFactory parameterToolFactory = new ParameterToolFactory();
  4. ParameterTool tool = parameterToolFactory.createParameterTool();
  5. EnvironmentSettings settings = EnvironmentSettings
  6. .newInstance()
  7. .useBlinkPlanner()
  8. .inBatchMode()
  9. .build();
  10. TableEnvironment tableEnv = TableEnvironment.create(settings);
  11. tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
  12. tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
  13. HiveCatalog testCataLog = new HiveCatalog(CataLogEnum.TEST.getCataLogName(), CataLogEnum.TEST.getDbName(),
  14. tool.get(FlinkProperEnum.FLINK_HIVE_CONF_DIR.key));
  15. tableEnv.registerCatalog(CataLogEnum.TEST.getCataLogName(), testCataLog);
  16. tableEnv.useCatalog(CataLogEnum.TEST.getCataLogName());
  17. String sql = "SELECT * FROM test.student as t1 JOIN test2.class t2 ON t1.id = t2.student_id";
  18. JdbcOptions options = JdbcOptions.builder()
  19. .setDBUrl(tool.get(FlinkProperEnum.FLINK_MYSQL_CUSTOM_DATASOURCE_NEWBI_URL.key))
  20. .setDriverName(tool.get(FlinkProperEnum.FLINK_MYSQL_CUSTOM_DATASOURCE_NEWBI_DRIVER_CLASS_NAME.key))
  21. .setUsername(tool.get(FlinkProperEnum.FLINK_MYSQL_CUSTOM_DATASOURCE_NEWBI_USERNAME.key))
  22. .setPassword(tool.get(FlinkProperEnum.FLINK_MYSQL_CUSTOM_DATASOURCE_NEWBI_PASSWORD.key))
  23. .setTableName("mysql_project_test")
  24. .setDialect(new MySQLDialect())
  25. .build();
  26. String[] fieldNames = {"student_id", "student_name", "student_curriculum", "student_score",
  27. "student_dt", "class_id", "class_student_id", "class_name", "class_size", "class_dt"};
  28. DataType[] fieldTypes = {DataTypes.INT(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.DOUBLE(),
  29. DataTypes.STRING(), DataTypes.INT(), DataTypes.INT(), DataTypes.STRING(), DataTypes.INT(),
  30. DataTypes.STRING()};
  31. String[] keys = {"student_id", "class_id"};
  32. TableSchema schema = TableSchema.builder()
  33. .fields(fieldNames, fieldTypes)
  34. .build();
  35. JdbcUpsertTableSink tableSink = JdbcUpsertTableSink.builder()
  36. .setOptions(options)
  37. .setTableSchema(schema)
  38. .setFlushIntervalMills(1000)
  39. // .setFlushMaxSize(10)
  40. .build();
  41. tableEnv.registerTableSink("mysql_project_test", tableSink);
  42. Table result = tableEnv.sqlQuery(sql);
  43. StatementSet statementSet = tableEnv.createStatementSet();
  44. statementSet.addInsert("mysql_project_test", result);
  45. statementSet.execute().print();
  46. }
  47. }

我的mysql表结构如下

  1. CREATE TABLE `mysql_project_test` (
  2. `student_id` int(11) NOT NULL DEFAULT '0',
  3. `student_name` varchar(200) DEFAULT NULL,
  4. `student_curriculum` varchar(200) DEFAULT NULL,
  5. `student_score` double DEFAULT NULL,
  6. `student_dt` varchar(200) DEFAULT NULL,
  7. `class_id` int(11) NOT NULL DEFAULT '0',
  8. `class_student_id` int(11) DEFAULT NULL,
  9. `class_name` varchar(200) DEFAULT NULL,
  10. `class_size` int(11) DEFAULT NULL,
  11. `class_dt` varchar(200) DEFAULT NULL,
  12. PRIMARY KEY (`student_id`,`class_id`)

当我在java程序中修改学生成绩时,数据不会更新,或者flink不会写入数据。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题