Flink1.9以后引入CatalogManager来管理Catalog和CatalogBaseTable,在执行DDL语句时将表信息封装为CatalogBaseTable存储在CatalogManager中。同时扩展了calcite的Schema接口使得Calcite在Validate阶段能够读取CatalogManager中的表信息。
通过执行DDL语句,查看BlinkPlanner如何解析DDL语句,并存储到CatalogManamer中,重点查看如何解析protime as PROCTIME()
计算列的。
CREATE TABLE user_address (
userId BIGINT,
addressInfo VARCHAR,
proctime AS PROCTIME()
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = 'localhost:9092',
'topic' = 'tp02',
'format' = 'json',
'scan.startup.mode' = 'latest-offset'
)
执行createCatalogTable调用链路。
org.apache.flink.table.api.internal.TableEnvironmentImpl#executeSql
org.apache.flink.table.planner.delegation.ParserImpl#parse
org.apache.flink.table.planner.operations.SqlToOperationConverter#convert
org.apache.flink.table.planner.operations.SqlCreateTableConverter#convertCreateTable
// 从SqlCreateTable语句中解析出CatalogTable
org.apache.flink.table.planner.operations.SqlCreateTableConverter#createCatalogTable
从SqlCreateTable中提取TableSchema、表分区、主键、注释等信息,从而构建CatalogTableImpl。
private CatalogTable createCatalogTable(SqlCreateTable sqlCreateTable) {
final TableSchema sourceTableSchema;
final List<String> sourcePartitionKeys;
final List<SqlTableLike.SqlTableLikeOption> likeOptions;
final Map<String, String> sourceProperties;
// 处理 create table like ...
if (sqlCreateTable.getTableLike().isPresent()) {
SqlTableLike sqlTableLike = sqlCreateTable.getTableLike().get();
CatalogTable table = lookupLikeSourceTable(sqlTableLike);
sourceTableSchema = table.getSchema();
sourcePartitionKeys = table.getPartitionKeys();
likeOptions = sqlTableLike.getOptions();
sourceProperties = table.getProperties();
} else {
sourceTableSchema = TableSchema.builder().build();
sourcePartitionKeys = Collections.emptyList();
likeOptions = Collections.emptyList();
sourceProperties = Collections.emptyMap();
}
// 处理SqlTableLike中的选项,INCLUDING ALL、OVERWRITING OPTIONS、EXCLUDING PARTITIONS等
Map<SqlTableLike.FeatureOption, SqlTableLike.MergingStrategy> mergingStrategies =
mergeTableLikeUtil.computeMergingStrategies(likeOptions);
Map<String, String> mergedOptions = mergeOptions(sqlCreateTable, sourceProperties, mergingStrategies);
// 提取主键
Optional<SqlTableConstraint> primaryKey = sqlCreateTable.getFullConstraints()
.stream()
.filter(SqlTableConstraint::isPrimaryKey)
.findAny();
// 获取TableSchema
TableSchema mergedSchema = mergeTableLikeUtil.mergeTables(
mergingStrategies,
sourceTableSchema, // 非create table like 语句,sourceTableSchema为null。
sqlCreateTable.getColumnList().getList(),
sqlCreateTable.getWatermark().map(Collections::singletonList).orElseGet(Collections::emptyList),
primaryKey.orElse(null)
);
// 表分区
List<String> partitionKeys = mergePartitions(
sourcePartitionKeys,
sqlCreateTable.getPartitionKeyList(),
mergingStrategies
);
verifyPartitioningColumnsExist(mergedSchema, partitionKeys);
// 注释
String tableComment = sqlCreateTable.getComment()
.map(comment -> comment.getNlsString().getValue())
.orElse(null);
return new CatalogTableImpl(mergedSchema,
partitionKeys,
mergedOptions,
tableComment);
}
在提取TableSchema时,会将Calcite中的列的类型转换为Flink内部的数据类型。如果包含了计算列,例如procime()则会对该表达式进行验证,FlinkSqlOperatorTable类中包含了FlinkSQL的所有内置函数。
/**
* Function used to access a processing time attribute.
*/
public static final SqlFunction PROCTIME =
new CalciteSqlFunction(
"PROCTIME",
SqlKind.OTHER_FUNCTION,
PROCTIME_TYPE_INFERENCE,
null,
OperandTypes.NILADIC,
SqlFunctionCategory.TIMEDATE,
false
);
TableColumn生成过程:
private void appendDerivedColumns(
Map<FeatureOption, MergingStrategy> mergingStrategies,
List<SqlNode> derivedColumns) {
// 非计算列进行数据转换,存储到physicalFieldNamesToTypes
collectPhysicalFieldsTypes(derivedColumns);
for (SqlNode derivedColumn : derivedColumns) {
final SqlTableColumn tableColumn = (SqlTableColumn) derivedColumn;
final TableColumn column;
if (tableColumn.isGenerated()) {
String fieldName = tableColumn.getName().toString();
//验证表达式,例如:proctime()函数 是否在FlinkSqlOperatorTable内注册
SqlNode validatedExpr = sqlValidator.validateParameterizedExpression(
tableColumn.getExpr().get(),
physicalFieldNamesToTypes);
// 验证返回类型:proctime() 对应的RelDataType 为Flink扩展的TimeIndicatorRelDataType
final RelDataType validatedType = sqlValidator.getValidatedNodeType(validatedExpr);
column = TableColumn.of(
fieldName,
// RelDataType--->LogicalType--->DataType
fromLogicalToDataType(toLogicalType(validatedType)),
escapeExpressions.apply(validatedExpr));
computedFieldNamesToTypes.put(fieldName, validatedType);
} else {
// 非计算列转换为Flink 内部的数据类型
String name = tableColumn.getName().getSimple();
// RelDataType --> LogicalType --> DataType
LogicalType logicalType = FlinkTypeFactory.toLogicalType(physicalFieldNamesToTypes.get(name));
column = TableColumn.of(name, TypeConversions.fromLogicalToDataType(logicalType));
}
columns.put(column.getName(), column);
}
}
计算列proctime信息,需要看下proctime Function定义时绑定的类型。
FlinkSchema包含三个子类分别为:CatalogManagerCalciteSchema,CatalogCalciteSchema,DatabaseCalciteSchema。在Calcite进行Validate时,通过调用重写的getSubSchema方法依次获取Catalog、Database信息,最终从catalogManager中获取对应的Table信息。
通过创建自定义Schema,查看Calcite获取表Schema信息。测试用例
// 当前Scheam包含了USERS、JOBS两张表信息,
public class CatalogManagerCalciteSchema implements Schema {
static Map<String, Table> TABLES = Maps.newHashMap();
static {
TABLES.put("USERS", new AbstractTable() { //note: add a table
@Override
public RelDataType getRowType(final RelDataTypeFactory typeFactory) {
RelDataTypeFactory.Builder builder = typeFactory.builder();
builder.add("ID", new BasicSqlType(new RelDataTypeSystemImpl() {
}, SqlTypeName.INTEGER));
builder.add("NAME", new BasicSqlType(new RelDataTypeSystemImpl() {
}, SqlTypeName.CHAR));
builder.add("AGE", new BasicSqlType(new RelDataTypeSystemImpl() {
}, SqlTypeName.INTEGER));
return builder.build();
}
});
TABLES.put("JOBS", new AbstractTable() {
@Override
public RelDataType getRowType(final RelDataTypeFactory typeFactory) {
RelDataTypeFactory.Builder builder = typeFactory.builder();
builder.add("ID", new BasicSqlType(new RelDataTypeSystemImpl() {
}, SqlTypeName.INTEGER));
builder.add("NAME", new BasicSqlType(new RelDataTypeSystemImpl() {
}, SqlTypeName.CHAR));
builder.add("COMPANY", new BasicSqlType(new RelDataTypeSystemImpl() {
}, SqlTypeName.CHAR));
return builder.build();
}
});
}
@Override
public Table getTable(String name) {
return TABLES.get(name);
}
@Override
public Set<String> getTableNames() {
return null;
}
@Override
public RelProtoDataType getType(String name) {
return null;
}
@Override
public Set<String> getTypeNames() {
return null;
}
@Override
public Collection<Function> getFunctions(String name) {
return null;
}
@Override
public Set<String> getFunctionNames() {
return Collections.emptySet();
}
@Override
public Schema getSubSchema(String name) {
return null;
}
@Override
public Set<String> getSubSchemaNames() {
return null;
}
@Override
public Expression getExpression(SchemaPlus parentSchema, String name) {
return null;
}
@Override
public boolean isMutable() {
return false;
}
@Override
public Schema snapshot(SchemaVersion version) {
return this;
}
}
案例
public static void main(String[] args) throws SqlParseException {
// CatalogManagerCalciteSchema是自定义的,非Flink内部的
CalciteSchema rootSchema =
CalciteSchemaBuilder.asRootSchema(new CatalogManagerCalciteSchema());
SchemaPlus schemaPlus = rootSchema.plus();
SqlTypeFactoryImpl factory = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
// 创建CalciteCatalogReader在rel阶段时从SimpleCalciteSchema中读取元数据
CalciteCatalogReader calciteCatalogReader = new CalciteCatalogReader(
CalciteSchema.from(schemaPlus),
CalciteSchema.from(schemaPlus).path(null),
factory,
new CalciteConnectionConfigImpl(new Properties()));
String sql = "select u.id as user_id, u.name as user_name, j.company as user_company, u.age as user_age \n"
+ "from users u join jobs j on u.name=j.name";
SqlParser parser = SqlParser.create(sql, SqlParser.Config.DEFAULT);
SqlNode sqlNode = parser.parseStmt();
SqlValidator
validator = SqlValidatorUtil.newValidator(SqlStdOperatorTable.instance(), calciteCatalogReader, factory,
SqlConformanceEnum.DEFAULT);
SqlNode validateSqlNode = validator.validate(sqlNode);
System.out.println(validateSqlNode);
}
Calcite通过Validate访问CatalogManagerCalciteSchema的调用链路,getSubSchema为null时代表没有子的Schema信息,则从当前Scheam读取Table信息。
Flink定义了三级Schema,通过调用getSubSchema从CatalogManager中读取Catalog、Database、Table。具体调用需要参考:org.apache.calcite.sql.validate.EmptyScope#resolve_。
CatalogManagerCalciteSchema#getSubSchemaNames:通过表名中的catalog信息,从catalogManager中获取CatalogSchema。
@Override
public Schema getSubSchema(String name) {
if (catalogManager.schemaExists(name)) {
return new CatalogCalciteSchema(name, catalogManager, isStreamingMode);
} else {
return null;
}
}
CatalogCalciteSchema#getSubSchemaNames:通过表名中的database信息,从catalogManager中获取DatabaseSchema。
/**
* Look up a sub-schema (database) by the given sub-schema name.
*
* @param schemaName name of sub-schema to look up
* @return the sub-schema with a given database name, or null
*/
@Override
public Schema getSubSchema(String schemaName) {
if (catalogManager.schemaExists(catalogName, schemaName)) {
return new DatabaseCalciteSchema(schemaName, catalogName, catalogManager, isStreamingMode);
} else {
return null;
}
}
DatabaseSchema没有SubScheam,则从当前Schema中获取Table信息。
public Table getTable(String tableName) {
ObjectIdentifier identifier = ObjectIdentifier.of(catalogName, databaseName, tableName);
return catalogManager.getTable(identifier)
.map(result -> {
CatalogBaseTable table = result.getTable();
FlinkStatistic statistic = getStatistic(result.isTemporary(), table, identifier);
return new CatalogSchemaTable(
identifier,
result,
statistic,
catalogManager.getCatalog(catalogName).orElseThrow(IllegalStateException::new),
isStreamingMode);
})
.orElse(null);
}
flinkSQL在validate读取Table schema时,会对计算列rowtime、proctime类型进行转换,转换为calcite能识别的RelDataType类型。 先列举下计算列类型转换的代码。
## CatalogManager
public Optional<TableLookupResult> getTable(ObjectIdentifier objectIdentifier) {
CatalogBaseTable temporaryTable = temporaryTables.get(objectIdentifier);
if (temporaryTable != null) {
TableSchema resolvedSchema = resolveTableSchema(temporaryTable);
return Optional.of(TableLookupResult.temporary(temporaryTable, resolvedSchema));
} else {
return getPermanentTable(objectIdentifier);
}
}
## org.apache.flink.table.api.internal.CatalogTableSchemaResolver#resolve
/**
* Resolve the computed column's type for the given schema.
*
* @param tableSchema Table schema to derive table field names and data types
* @return the resolved TableSchema
*/
public TableSchema resolve(TableSchema tableSchema) {
final String rowtime;
String[] fieldNames = tableSchema.getFieldNames();
DataType[] fieldTypes = tableSchema.getFieldDataTypes();
TableSchema.Builder builder = TableSchema.builder();
for (int i = 0; i < tableSchema.getFieldCount(); ++i) {
TableColumn tableColumn = tableSchema.getTableColumns().get(i);
DataType fieldType = fieldTypes[i];
if (tableColumn.isGenerated()) {
// 通过获取计算列的表达式,提取对应的DataType
fieldType = resolveExpressionDataType(tableColumn.getExpr().get(), tableSchema);
if (isProctime(fieldType)) {
if (fieldNames[i].equals(rowtime)) {
throw new TableException("Watermark can not be defined for a processing time attribute column.");
}
}
}
......
if (tableColumn.isGenerated()) {
builder.field(fieldNames[i], fieldType, tableColumn.getExpr().get());
} else {
builder.field(fieldNames[i], fieldType);
}
}
tableSchema.getWatermarkSpecs().forEach(builder::watermark);
tableSchema.getPrimaryKey().ifPresent(
pk -> builder.primaryKey(pk.getName(), pk.getColumns().toArray(new String[0])));
return builder.build();
}
# org.apache.flink.table.api.internal.CatalogTableSchemaResolver#resolveExpressionDataType
private DataType resolveExpressionDataType(String expr, TableSchema tableSchema) {
ResolvedExpression resolvedExpr = parser.parseSqlExpression(expr, tableSchema);
if (resolvedExpr == null) {
throw new ValidationException("Could not resolve field expression: " + expr);
}
return resolvedExpr.getOutputDataType();
}
# org.apache.flink.table.planner.delegation.ParserImpl#parseSqlExpression
public ResolvedExpression parseSqlExpression(String sqlExpression, TableSchema inputSchema) {
SqlExprToRexConverter sqlExprToRexConverter = sqlExprToRexConverterCreator.apply(inputSchema);
RexNode rexNode = sqlExprToRexConverter.convertToRexNode(sqlExpression);
// [[RelDataType]] ----> [[LogicalType]]
LogicalType logicalType = FlinkTypeFactory.toLogicalType(rexNode.getType());
return new RexNodeExpression(rexNode, TypeConversions.fromLogicalToDataType(logicalType));
}
# org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl#convertToRexNodes
public RexNode[] convertToRexNodes(String[] exprs) {
// 通过构造临时表查询,获取RexNode
String query = String.format(QUERY_FORMAT, String.join(",", exprs));
SqlNode parsed = planner.parser().parse(query);
SqlNode validated = planner.validate(parsed);
// 转换为relNode
RelNode rel = planner.rel(validated).rel;
// The plan should in the following tree
// LogicalProject
// +- TableScan
if (rel instanceof LogicalProject
&& rel.getInput(0) != null
&& rel.getInput(0) instanceof TableScan) {
return ((LogicalProject) rel).getProjects().toArray(new RexNode[0]);
} else {
throw new IllegalStateException("The root RelNode should be LogicalProject, but is " + rel.toString());
}
}
proctime()类型提取大致流程:
[FLINK-18378]之前对计算列的处理流程。根据proctime、rowtime单独做了区分。
疑问:在DDL语句中已经将proctime转换为DataType,在validate获取Table schema是直接拿fieldType即可,为什么还要做一次解析。
for (int i = 0; i < tableSchema.getFieldCount(); ++i) {
TableColumn tableColumn = tableSchema.getTableColumns().get(i);
DataType fieldType = fieldTypes[i];
if (tableColumn.isGenerated() && isProctimeType(tableColumn.getExpr().get(), tableSchema)) {
if (fieldNames[i].equals(rowtime)) {
throw new TableException("Watermark can not be defined for a processing time attribute column.");
}
TimestampType originalType = (TimestampType) fieldType.getLogicalType();
LogicalType proctimeType = new TimestampType(
originalType.isNullable(),
TimestampKind.PROCTIME,
originalType.getPrecision());
fieldType = TypeConversions.fromLogicalToDataType(proctimeType);
} else if (isStreamingMode && fieldNames[i].equals(rowtime)) {
TimestampType originalType = (TimestampType) fieldType.getLogicalType();
LogicalType rowtimeType = new TimestampType(
originalType.isNullable(),
TimestampKind.ROWTIME,
originalType.getPrecision());
fieldType = TypeConversions.fromLogicalToDataType(rowtimeType);
}
if (tableColumn.isGenerated()) {
builder.field(fieldNames[i], fieldType, tableColumn.getExpr().get());
} else {
builder.field(fieldNames[i], fieldType);
}
}
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/qq_21383435/article/details/122769008
内容来源于网络,如有侵权,请联系作者删除!