【Flink】Flink SQL 自定义 Source format

x33g5p2x  于2022-01-05 转载在 Flink  
字(7.8k)|赞(0)|评价(0)|浏览(692)

1.概述

转载:Flink SQL 自定义 Source format

1.背景

由于 kafka 中的 json 属于嵌套,又不想二次序列化再把它展开,故自定义 format。

2.步骤

1.自定义 Factory 实现 DeserializationFormatFactory

2.自定义 DeserializationSchema 实现 DeserializationSchema

自定义 Factory 中 createDecodingFormat 方法返回 createDecodingFormat

3.自定义 Format

为了简单起见,我们自定义一个 NullFormat ,也就是无论 kafka 中的消息是什么都返回 null,相当于 kafka 中没有消息

自定义 Factory

  1. import org.apache.flink.api.common.serialization.DeserializationSchema;
  2. import org.apache.flink.api.common.serialization.SerializationSchema;
  3. import org.apache.flink.api.common.typeinfo.TypeInformation;
  4. import org.apache.flink.configuration.ConfigOption;
  5. import org.apache.flink.configuration.ReadableConfig;
  6. import org.apache.flink.formats.json.JsonOptions;
  7. import org.apache.flink.formats.json.TimestampFormat;
  8. import org.apache.flink.table.api.ValidationException;
  9. import org.apache.flink.table.connector.ChangelogMode;
  10. import org.apache.flink.table.connector.format.DecodingFormat;
  11. import org.apache.flink.table.connector.source.DynamicTableSource;
  12. import org.apache.flink.table.data.RowData;
  13. import org.apache.flink.table.factories.DeserializationFormatFactory;
  14. import org.apache.flink.table.factories.DynamicTableFactory;
  15. import org.apache.flink.table.factories.FactoryUtil;
  16. import org.apache.flink.table.types.DataType;
  17. import org.apache.flink.table.types.logical.RowType;
  18. import java.util.Collections;
  19. import java.util.HashSet;
  20. import java.util.Set;
  21. import static org.apache.flink.formats.json.JsonOptions.*;
  22. /** * Table format factory for providing configured instances of JSON to RowData * {@link SerializationSchema} and {@link DeserializationSchema}. */
  23. public class NullFormatFactory implements
  24. DeserializationFormatFactory {
  25. // Factory 的唯一标识
  26. public static final String IDENTIFIER = "null";
  27. @SuppressWarnings("unchecked")
  28. @Override
  29. // 解码的入口方法 基本上属于固定写法
  30. public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(
  31. DynamicTableFactory.Context context,
  32. ReadableConfig formatOptions) {
  33. FactoryUtil.validateFactoryOptions(this, formatOptions);
  34. validateFormatOptions(formatOptions);
  35. final boolean failOnMissingField = formatOptions.get(FAIL_ON_MISSING_FIELD);
  36. final boolean ignoreParseErrors = formatOptions.get(IGNORE_PARSE_ERRORS);
  37. TimestampFormat timestampOption = JsonOptions.getTimestampFormat(formatOptions);
  38. return new DecodingFormat<DeserializationSchema<RowData>>() {
  39. @Override
  40. public DeserializationSchema<RowData> createRuntimeDecoder(
  41. DynamicTableSource.Context context,//ScanRuntimeProviderContext
  42. DataType producedDataType) { // 表的字段名和数据类型
  43. final RowType rowType = (RowType) producedDataType.getLogicalType();
  44. final TypeInformation<RowData> rowDataTypeInfo =
  45. (TypeInformation<RowData>) context.createTypeInformation(producedDataType);
  46. return new NullRowDataDeserializationSchema(
  47. rowType,
  48. rowDataTypeInfo,
  49. failOnMissingField,
  50. ignoreParseErrors,
  51. timestampOption
  52. );
  53. }
  54. @Override
  55. public ChangelogMode getChangelogMode() {
  56. return ChangelogMode.insertOnly();
  57. }
  58. };
  59. }
  60. @Override
  61. public String factoryIdentifier() {
  62. return IDENTIFIER;
  63. }
  64. @Override
  65. public Set<ConfigOption<?>> requiredOptions() {
  66. return Collections.emptySet();
  67. }
  68. @Override
  69. public Set<ConfigOption<?>> optionalOptions() {
  70. Set<ConfigOption<?>> options = new HashSet<>();
  71. options.add(FAIL_ON_MISSING_FIELD);
  72. options.add(IGNORE_PARSE_ERRORS);
  73. options.add(TIMESTAMP_FORMAT);
  74. return options;
  75. }
  76. // ------------------------------------------------------------------------
  77. // Validation
  78. // ------------------------------------------------------------------------
  79. static void validateFormatOptions(ReadableConfig tableOptions) {
  80. boolean failOnMissingField = tableOptions.get(FAIL_ON_MISSING_FIELD);
  81. boolean ignoreParseErrors = tableOptions.get(IGNORE_PARSE_ERRORS);
  82. String timestampFormat = tableOptions.get(TIMESTAMP_FORMAT);
  83. if (ignoreParseErrors && failOnMissingField) {
  84. throw new ValidationException(FAIL_ON_MISSING_FIELD.key()
  85. + " and "
  86. + IGNORE_PARSE_ERRORS.key()
  87. + " shouldn't both be true.");
  88. }
  89. if (!TIMESTAMP_FORMAT_ENUM.contains(timestampFormat)) {
  90. throw new ValidationException(String.format("Unsupported value '%s' for %s. Supported values are [SQL, ISO-8601].",
  91. timestampFormat, TIMESTAMP_FORMAT.key()));
  92. }
  93. }
  94. }

自定义 DeserializationSchema

  1. import org.apache.flink.annotation.Internal;
  2. import org.apache.flink.api.common.serialization.DeserializationSchema;
  3. import org.apache.flink.api.common.typeinfo.TypeInformation;
  4. import org.apache.flink.formats.json.TimestampFormat;
  5. import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
  6. import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
  7. import org.apache.flink.table.data.RowData;
  8. import org.apache.flink.table.types.logical.RowType;
  9. import java.io.IOException;
  10. import java.util.Objects;
  11. import static org.apache.flink.util.Preconditions.checkNotNull;
  12. @Internal
  13. public class NullRowDataDeserializationSchema implements DeserializationSchema<RowData> {
  14. private static final long serialVersionUID = 1L;
  15. /** * Flag indicating whether to fail if a field is missing. */
  16. private final boolean failOnMissingField;
  17. /** * Flag indicating whether to ignore invalid fields/rows (default: throw an exception). */
  18. private final boolean ignoreParseErrors;
  19. /** * TypeInformation of the produced {@link RowData}. **/
  20. private final TypeInformation<RowData> resultTypeInfo;
  21. /** * Runtime converter that converts {@link JsonNode}s into * objects of Flink SQL internal data structures. **/
  22. /** * Object mapper for parsing the JSON. */
  23. private final ObjectMapper objectMapper = new ObjectMapper();
  24. /** * Timestamp format specification which is used to parse timestamp. */
  25. private final TimestampFormat timestampFormat;
  26. public NullRowDataDeserializationSchema(
  27. RowType rowType,
  28. TypeInformation<RowData> resultTypeInfo,
  29. boolean failOnMissingField,
  30. boolean ignoreParseErrors,
  31. TimestampFormat timestampFormat) {
  32. if (ignoreParseErrors && failOnMissingField) {
  33. throw new IllegalArgumentException(
  34. "JSON format doesn't support failOnMissingField and ignoreParseErrors are both enabled.");
  35. }
  36. this.resultTypeInfo = checkNotNull(resultTypeInfo);
  37. this.failOnMissingField = failOnMissingField;
  38. this.ignoreParseErrors = ignoreParseErrors;
  39. this.timestampFormat = timestampFormat;
  40. }
  41. @Override
  42. // 这里其实是真正的反序列化逻辑,比如说将 json 拍平 (多层嵌套转化为一层嵌套 )
  43. // 这里是重点,记得关注重点
  44. public RowData deserialize(byte[] message) throws IOException {
  45. return null;
  46. }
  47. @Override
  48. public boolean isEndOfStream(RowData nextElement) {
  49. return false;
  50. }
  51. @Override
  52. public TypeInformation<RowData> getProducedType() {
  53. return resultTypeInfo;
  54. }
  55. @Override
  56. public boolean equals(Object o) {
  57. if (this == o) {
  58. return true;
  59. }
  60. if (o == null || getClass() != o.getClass()) {
  61. return false;
  62. }
  63. NullRowDataDeserializationSchema that = (NullRowDataDeserializationSchema) o;
  64. return failOnMissingField == that.failOnMissingField &&
  65. ignoreParseErrors == that.ignoreParseErrors &&
  66. resultTypeInfo.equals(that.resultTypeInfo) &&
  67. timestampFormat.equals(that.timestampFormat);
  68. }
  69. @Override
  70. public int hashCode() {
  71. return Objects.hash(failOnMissingField, ignoreParseErrors, resultTypeInfo, timestampFormat);
  72. }
  73. }

4.使用自定义 Format

  1. public class SqlKafka {
  2. public static void main(String[] args) throws Exception {
  3. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  4. EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
  5. StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, environmentSettings);
  6. // enable checkpointing
  7. Configuration configuration = tableEnv.getConfig().getConfiguration();
  8. configuration.set(
  9. ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE);
  10. configuration.set(
  11. ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(10));
  12. String sql = "CREATE TABLE sourcedata (`id` bigint,`status` int,`city_id` bigint,`courier_id` bigint,info_index int,order_id bigint,tableName String" +
  13. ") WITH (" +
  14. "'connector' = 'kafka','topic' = 'canal_monitor_order'," +
  15. "'properties.bootstrap.servers' = 'bigdata-dev-mq:9092','properties.group.id' = 'testGroup'," +
  16. "'format' = 'null','scan.startup.mode' = 'earliest-offset')";
  17. tableEnv.executeSql(sql);
  18. ......

‘format’ = ‘null’ Factory 的唯一标识

然后就可以直接执行了

相关文章

最新文章

更多