【Flink】FlinkSQL元数据验证

x33g5p2x  于2022-02-07 转载在 Flink  
字(14.2k)|赞(0)|评价(0)|浏览(904)

1.概述

转载:FlinkSQL元数据验证

Flink1.9以后引入CatalogManager来管理Catalog和CatalogBaseTable,在执行DDL语句时将表信息封装为CatalogBaseTable存储在CatalogManager中。同时扩展了calcite的Schema接口使得Calcite在Validate阶段能够读取CatalogManager中的表信息。

2.CatalogTable写入

通过执行DDL语句,查看BlinkPlanner如何解析DDL语句,并存储到CatalogManamer中,重点查看如何解析protime as PROCTIME() 计算列的。

  1. CREATE TABLE user_address (
  2. userId BIGINT,
  3. addressInfo VARCHAR,
  4. proctime AS PROCTIME()
  5. ) WITH (
  6. 'connector' = 'kafka',
  7. 'properties.bootstrap.servers' = 'localhost:9092',
  8. 'topic' = 'tp02',
  9. 'format' = 'json',
  10. 'scan.startup.mode' = 'latest-offset'
  11. )

执行createCatalogTable调用链路。

  1. org.apache.flink.table.api.internal.TableEnvironmentImpl#executeSql
  2. org.apache.flink.table.planner.delegation.ParserImpl#parse
  3. org.apache.flink.table.planner.operations.SqlToOperationConverter#convert
  4. org.apache.flink.table.planner.operations.SqlCreateTableConverter#convertCreateTable
  5. // 从SqlCreateTable语句中解析出CatalogTable
  6. org.apache.flink.table.planner.operations.SqlCreateTableConverter#createCatalogTable

从SqlCreateTable中提取TableSchema、表分区、主键、注释等信息,从而构建CatalogTableImpl。

  1. private CatalogTable createCatalogTable(SqlCreateTable sqlCreateTable) {
  2. final TableSchema sourceTableSchema;
  3. final List<String> sourcePartitionKeys;
  4. final List<SqlTableLike.SqlTableLikeOption> likeOptions;
  5. final Map<String, String> sourceProperties;
  6. // 处理 create table like ...
  7. if (sqlCreateTable.getTableLike().isPresent()) {
  8. SqlTableLike sqlTableLike = sqlCreateTable.getTableLike().get();
  9. CatalogTable table = lookupLikeSourceTable(sqlTableLike);
  10. sourceTableSchema = table.getSchema();
  11. sourcePartitionKeys = table.getPartitionKeys();
  12. likeOptions = sqlTableLike.getOptions();
  13. sourceProperties = table.getProperties();
  14. } else {
  15. sourceTableSchema = TableSchema.builder().build();
  16. sourcePartitionKeys = Collections.emptyList();
  17. likeOptions = Collections.emptyList();
  18. sourceProperties = Collections.emptyMap();
  19. }
  20. // 处理SqlTableLike中的选项,INCLUDING ALL、OVERWRITING OPTIONS、EXCLUDING PARTITIONS等
  21. Map<SqlTableLike.FeatureOption, SqlTableLike.MergingStrategy> mergingStrategies =
  22. mergeTableLikeUtil.computeMergingStrategies(likeOptions);
  23. Map<String, String> mergedOptions = mergeOptions(sqlCreateTable, sourceProperties, mergingStrategies);
  24. // 提取主键
  25. Optional<SqlTableConstraint> primaryKey = sqlCreateTable.getFullConstraints()
  26. .stream()
  27. .filter(SqlTableConstraint::isPrimaryKey)
  28. .findAny();
  29. // 获取TableSchema
  30. TableSchema mergedSchema = mergeTableLikeUtil.mergeTables(
  31. mergingStrategies,
  32. sourceTableSchema, // 非create table like 语句,sourceTableSchema为null。
  33. sqlCreateTable.getColumnList().getList(),
  34. sqlCreateTable.getWatermark().map(Collections::singletonList).orElseGet(Collections::emptyList),
  35. primaryKey.orElse(null)
  36. );
  37. // 表分区
  38. List<String> partitionKeys = mergePartitions(
  39. sourcePartitionKeys,
  40. sqlCreateTable.getPartitionKeyList(),
  41. mergingStrategies
  42. );
  43. verifyPartitioningColumnsExist(mergedSchema, partitionKeys);
  44. // 注释
  45. String tableComment = sqlCreateTable.getComment()
  46. .map(comment -> comment.getNlsString().getValue())
  47. .orElse(null);
  48. return new CatalogTableImpl(mergedSchema,
  49. partitionKeys,
  50. mergedOptions,
  51. tableComment);
  52. }

在提取TableSchema时,会将Calcite中的列的类型转换为Flink内部的数据类型。如果包含了计算列,例如procime()则会对该表达式进行验证,FlinkSqlOperatorTable类中包含了FlinkSQL的所有内置函数。

  1. /**
  2. * Function used to access a processing time attribute.
  3. */
  4. public static final SqlFunction PROCTIME =
  5. new CalciteSqlFunction(
  6. "PROCTIME",
  7. SqlKind.OTHER_FUNCTION,
  8. PROCTIME_TYPE_INFERENCE,
  9. null,
  10. OperandTypes.NILADIC,
  11. SqlFunctionCategory.TIMEDATE,
  12. false
  13. );

TableColumn生成过程:

  • 将非计算列进行类型转换,并存储到physicalFieldNamesToTypes集合。
  • 对通过表达式生成的计算列,进行存在性验证,并返回该函数对应的RelDataType。
  • 将字段的RelDataType转换为LogicalType,再转换为DataType,并构建为TableColumn。需要具体查看不同类型系统的区别。
  1. private void appendDerivedColumns(
  2. Map<FeatureOption, MergingStrategy> mergingStrategies,
  3. List<SqlNode> derivedColumns) {
  4. // 非计算列进行数据转换,存储到physicalFieldNamesToTypes
  5. collectPhysicalFieldsTypes(derivedColumns);
  6. for (SqlNode derivedColumn : derivedColumns) {
  7. final SqlTableColumn tableColumn = (SqlTableColumn) derivedColumn;
  8. final TableColumn column;
  9. if (tableColumn.isGenerated()) {
  10. String fieldName = tableColumn.getName().toString();
  11. //验证表达式,例如:proctime()函数 是否在FlinkSqlOperatorTable内注册
  12. SqlNode validatedExpr = sqlValidator.validateParameterizedExpression(
  13. tableColumn.getExpr().get(),
  14. physicalFieldNamesToTypes);
  15. // 验证返回类型:proctime() 对应的RelDataType 为Flink扩展的TimeIndicatorRelDataType
  16. final RelDataType validatedType = sqlValidator.getValidatedNodeType(validatedExpr);
  17. column = TableColumn.of(
  18. fieldName,
  19. // RelDataType--->LogicalType--->DataType
  20. fromLogicalToDataType(toLogicalType(validatedType)),
  21. escapeExpressions.apply(validatedExpr));
  22. computedFieldNamesToTypes.put(fieldName, validatedType);
  23. } else {
  24. // 非计算列转换为Flink 内部的数据类型
  25. String name = tableColumn.getName().getSimple();
  26. // RelDataType --> LogicalType --> DataType
  27. LogicalType logicalType = FlinkTypeFactory.toLogicalType(physicalFieldNamesToTypes.get(name));
  28. column = TableColumn.of(name, TypeConversions.fromLogicalToDataType(logicalType));
  29. }
  30. columns.put(column.getName(), column);
  31. }
  32. }

计算列proctime信息,需要看下proctime Function定义时绑定的类型。

3.Validate 读取元数据

FlinkSchema包含三个子类分别为:CatalogManagerCalciteSchema,CatalogCalciteSchema,DatabaseCalciteSchema。在Calcite进行Validate时,通过调用重写的getSubSchema方法依次获取Catalog、Database信息,最终从catalogManager中获取对应的Table信息。
通过创建自定义Schema,查看Calcite获取表Schema信息。测试用例

  1. // 当前Scheam包含了USERS、JOBS两张表信息,
  2. public class CatalogManagerCalciteSchema implements Schema {
  3. static Map<String, Table> TABLES = Maps.newHashMap();
  4. static {
  5. TABLES.put("USERS", new AbstractTable() { //note: add a table
  6. @Override
  7. public RelDataType getRowType(final RelDataTypeFactory typeFactory) {
  8. RelDataTypeFactory.Builder builder = typeFactory.builder();
  9. builder.add("ID", new BasicSqlType(new RelDataTypeSystemImpl() {
  10. }, SqlTypeName.INTEGER));
  11. builder.add("NAME", new BasicSqlType(new RelDataTypeSystemImpl() {
  12. }, SqlTypeName.CHAR));
  13. builder.add("AGE", new BasicSqlType(new RelDataTypeSystemImpl() {
  14. }, SqlTypeName.INTEGER));
  15. return builder.build();
  16. }
  17. });
  18. TABLES.put("JOBS", new AbstractTable() {
  19. @Override
  20. public RelDataType getRowType(final RelDataTypeFactory typeFactory) {
  21. RelDataTypeFactory.Builder builder = typeFactory.builder();
  22. builder.add("ID", new BasicSqlType(new RelDataTypeSystemImpl() {
  23. }, SqlTypeName.INTEGER));
  24. builder.add("NAME", new BasicSqlType(new RelDataTypeSystemImpl() {
  25. }, SqlTypeName.CHAR));
  26. builder.add("COMPANY", new BasicSqlType(new RelDataTypeSystemImpl() {
  27. }, SqlTypeName.CHAR));
  28. return builder.build();
  29. }
  30. });
  31. }
  32. @Override
  33. public Table getTable(String name) {
  34. return TABLES.get(name);
  35. }
  36. @Override
  37. public Set<String> getTableNames() {
  38. return null;
  39. }
  40. @Override
  41. public RelProtoDataType getType(String name) {
  42. return null;
  43. }
  44. @Override
  45. public Set<String> getTypeNames() {
  46. return null;
  47. }
  48. @Override
  49. public Collection<Function> getFunctions(String name) {
  50. return null;
  51. }
  52. @Override
  53. public Set<String> getFunctionNames() {
  54. return Collections.emptySet();
  55. }
  56. @Override
  57. public Schema getSubSchema(String name) {
  58. return null;
  59. }
  60. @Override
  61. public Set<String> getSubSchemaNames() {
  62. return null;
  63. }
  64. @Override
  65. public Expression getExpression(SchemaPlus parentSchema, String name) {
  66. return null;
  67. }
  68. @Override
  69. public boolean isMutable() {
  70. return false;
  71. }
  72. @Override
  73. public Schema snapshot(SchemaVersion version) {
  74. return this;
  75. }
  76. }

案例

  1. public static void main(String[] args) throws SqlParseException {
  2. // CatalogManagerCalciteSchema是自定义的,非Flink内部的
  3. CalciteSchema rootSchema =
  4. CalciteSchemaBuilder.asRootSchema(new CatalogManagerCalciteSchema());
  5. SchemaPlus schemaPlus = rootSchema.plus();
  6. SqlTypeFactoryImpl factory = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
  7. // 创建CalciteCatalogReader在rel阶段时从SimpleCalciteSchema中读取元数据
  8. CalciteCatalogReader calciteCatalogReader = new CalciteCatalogReader(
  9. CalciteSchema.from(schemaPlus),
  10. CalciteSchema.from(schemaPlus).path(null),
  11. factory,
  12. new CalciteConnectionConfigImpl(new Properties()));
  13. String sql = "select u.id as user_id, u.name as user_name, j.company as user_company, u.age as user_age \n"
  14. + "from users u join jobs j on u.name=j.name";
  15. SqlParser parser = SqlParser.create(sql, SqlParser.Config.DEFAULT);
  16. SqlNode sqlNode = parser.parseStmt();
  17. SqlValidator
  18. validator = SqlValidatorUtil.newValidator(SqlStdOperatorTable.instance(), calciteCatalogReader, factory,
  19. SqlConformanceEnum.DEFAULT);
  20. SqlNode validateSqlNode = validator.validate(sqlNode);
  21. System.out.println(validateSqlNode);
  22. }

Calcite通过Validate访问CatalogManagerCalciteSchema的调用链路,getSubSchema为null时代表没有子的Schema信息,则从当前Scheam读取Table信息。

Flink定义了三级Schema,通过调用getSubSchema从CatalogManager中读取Catalog、Database、Table。具体调用需要参考:org.apache.calcite.sql.validate.EmptyScope#resolve_。

CatalogManagerCalciteSchema#getSubSchemaNames:通过表名中的catalog信息,从catalogManager中获取CatalogSchema。

  1. @Override
  2. public Schema getSubSchema(String name) {
  3. if (catalogManager.schemaExists(name)) {
  4. return new CatalogCalciteSchema(name, catalogManager, isStreamingMode);
  5. } else {
  6. return null;
  7. }
  8. }

CatalogCalciteSchema#getSubSchemaNames:通过表名中的database信息,从catalogManager中获取DatabaseSchema。

  1. /**
  2. * Look up a sub-schema (database) by the given sub-schema name.
  3. *
  4. * @param schemaName name of sub-schema to look up
  5. * @return the sub-schema with a given database name, or null
  6. */
  7. @Override
  8. public Schema getSubSchema(String schemaName) {
  9. if (catalogManager.schemaExists(catalogName, schemaName)) {
  10. return new DatabaseCalciteSchema(schemaName, catalogName, catalogManager, isStreamingMode);
  11. } else {
  12. return null;
  13. }
  14. }

DatabaseSchema没有SubScheam,则从当前Schema中获取Table信息。

  1. public Table getTable(String tableName) {
  2. ObjectIdentifier identifier = ObjectIdentifier.of(catalogName, databaseName, tableName);
  3. return catalogManager.getTable(identifier)
  4. .map(result -> {
  5. CatalogBaseTable table = result.getTable();
  6. FlinkStatistic statistic = getStatistic(result.isTemporary(), table, identifier);
  7. return new CatalogSchemaTable(
  8. identifier,
  9. result,
  10. statistic,
  11. catalogManager.getCatalog(catalogName).orElseThrow(IllegalStateException::new),
  12. isStreamingMode);
  13. })
  14. .orElse(null);
  15. }

4.Proctime 字段验证

flinkSQL在validate读取Table schema时,会对计算列rowtime、proctime类型进行转换,转换为calcite能识别的RelDataType类型。 先列举下计算列类型转换的代码。

  1. ## CatalogManager
  2. public Optional<TableLookupResult> getTable(ObjectIdentifier objectIdentifier) {
  3. CatalogBaseTable temporaryTable = temporaryTables.get(objectIdentifier);
  4. if (temporaryTable != null) {
  5. TableSchema resolvedSchema = resolveTableSchema(temporaryTable);
  6. return Optional.of(TableLookupResult.temporary(temporaryTable, resolvedSchema));
  7. } else {
  8. return getPermanentTable(objectIdentifier);
  9. }
  10. }
  1. ## org.apache.flink.table.api.internal.CatalogTableSchemaResolver#resolve
  2. /**
  3. * Resolve the computed column's type for the given schema.
  4. *
  5. * @param tableSchema Table schema to derive table field names and data types
  6. * @return the resolved TableSchema
  7. */
  8. public TableSchema resolve(TableSchema tableSchema) {
  9. final String rowtime;
  10. String[] fieldNames = tableSchema.getFieldNames();
  11. DataType[] fieldTypes = tableSchema.getFieldDataTypes();
  12. TableSchema.Builder builder = TableSchema.builder();
  13. for (int i = 0; i < tableSchema.getFieldCount(); ++i) {
  14. TableColumn tableColumn = tableSchema.getTableColumns().get(i);
  15. DataType fieldType = fieldTypes[i];
  16. if (tableColumn.isGenerated()) {
  17. // 通过获取计算列的表达式,提取对应的DataType
  18. fieldType = resolveExpressionDataType(tableColumn.getExpr().get(), tableSchema);
  19. if (isProctime(fieldType)) {
  20. if (fieldNames[i].equals(rowtime)) {
  21. throw new TableException("Watermark can not be defined for a processing time attribute column.");
  22. }
  23. }
  24. }
  25. ......
  26. if (tableColumn.isGenerated()) {
  27. builder.field(fieldNames[i], fieldType, tableColumn.getExpr().get());
  28. } else {
  29. builder.field(fieldNames[i], fieldType);
  30. }
  31. }
  32. tableSchema.getWatermarkSpecs().forEach(builder::watermark);
  33. tableSchema.getPrimaryKey().ifPresent(
  34. pk -> builder.primaryKey(pk.getName(), pk.getColumns().toArray(new String[0])));
  35. return builder.build();
  36. }
  1. # org.apache.flink.table.api.internal.CatalogTableSchemaResolver#resolveExpressionDataType
  2. private DataType resolveExpressionDataType(String expr, TableSchema tableSchema) {
  3. ResolvedExpression resolvedExpr = parser.parseSqlExpression(expr, tableSchema);
  4. if (resolvedExpr == null) {
  5. throw new ValidationException("Could not resolve field expression: " + expr);
  6. }
  7. return resolvedExpr.getOutputDataType();
  8. }
  9. # org.apache.flink.table.planner.delegation.ParserImpl#parseSqlExpression
  10. public ResolvedExpression parseSqlExpression(String sqlExpression, TableSchema inputSchema) {
  11. SqlExprToRexConverter sqlExprToRexConverter = sqlExprToRexConverterCreator.apply(inputSchema);
  12. RexNode rexNode = sqlExprToRexConverter.convertToRexNode(sqlExpression);
  13. // [[RelDataType]] ----> [[LogicalType]]
  14. LogicalType logicalType = FlinkTypeFactory.toLogicalType(rexNode.getType());
  15. return new RexNodeExpression(rexNode, TypeConversions.fromLogicalToDataType(logicalType));
  16. }
  17. # org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl#convertToRexNodes
  18. public RexNode[] convertToRexNodes(String[] exprs) {
  19. // 通过构造临时表查询,获取RexNode
  20. String query = String.format(QUERY_FORMAT, String.join(",", exprs));
  21. SqlNode parsed = planner.parser().parse(query);
  22. SqlNode validated = planner.validate(parsed);
  23. // 转换为relNode
  24. RelNode rel = planner.rel(validated).rel;
  25. // The plan should in the following tree
  26. // LogicalProject
  27. // +- TableScan
  28. if (rel instanceof LogicalProject
  29. && rel.getInput(0) != null
  30. && rel.getInput(0) instanceof TableScan) {
  31. return ((LogicalProject) rel).getProjects().toArray(new RexNode[0]);
  32. } else {
  33. throw new IllegalStateException("The root RelNode should be LogicalProject, but is " + rel.toString());
  34. }
  35. }

proctime()类型提取大致流程:

  1. 包含计算列则将建表语句中TableSchema注册为一张表temp_table。
  2. 根据建表中的计算列的表达式,例如proctime(),构建临时查询语句select proctime() from temp_table,proctime() 为Flink 内置函数。
  3. 对该查询语句进行validate,并转换RelNode,从RelNode提取行表达式RexNode。
  4. 从RexNode提取proctime() 对应的RelDataType,最终转换为DataType。

[FLINK-18378]之前对计算列的处理流程。根据proctime、rowtime单独做了区分。
疑问:在DDL语句中已经将proctime转换为DataType,在validate获取Table schema是直接拿fieldType即可,为什么还要做一次解析。

  1. for (int i = 0; i < tableSchema.getFieldCount(); ++i) {
  2. TableColumn tableColumn = tableSchema.getTableColumns().get(i);
  3. DataType fieldType = fieldTypes[i];
  4. if (tableColumn.isGenerated() && isProctimeType(tableColumn.getExpr().get(), tableSchema)) {
  5. if (fieldNames[i].equals(rowtime)) {
  6. throw new TableException("Watermark can not be defined for a processing time attribute column.");
  7. }
  8. TimestampType originalType = (TimestampType) fieldType.getLogicalType();
  9. LogicalType proctimeType = new TimestampType(
  10. originalType.isNullable(),
  11. TimestampKind.PROCTIME,
  12. originalType.getPrecision());
  13. fieldType = TypeConversions.fromLogicalToDataType(proctimeType);
  14. } else if (isStreamingMode && fieldNames[i].equals(rowtime)) {
  15. TimestampType originalType = (TimestampType) fieldType.getLogicalType();
  16. LogicalType rowtimeType = new TimestampType(
  17. originalType.isNullable(),
  18. TimestampKind.ROWTIME,
  19. originalType.getPrecision());
  20. fieldType = TypeConversions.fromLogicalToDataType(rowtimeType);
  21. }
  22. if (tableColumn.isGenerated()) {
  23. builder.field(fieldNames[i], fieldType, tableColumn.getExpr().get());
  24. } else {
  25. builder.field(fieldNames[i], fieldType);
  26. }
  27. }

相关文章

最新文章

更多