如何使用avro模式创建Flink表,其中所有列都从模式推断

inn6fuwd  于 2024-01-04  发布在  Apache
关注(0)|答案(1)|浏览(194)

我正在跟随https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/table/formats/avro/在Flink中创建表格。

CREATE TABLE user_behavior (
  user_id BIGINT,
  item_id BIGINT,
  category_id BIGINT,
  behavior STRING,
  ts TIMESTAMP(3)
) WITH (
 'connector' = 'kafka',
 'topic' = 'user_behavior',
 'properties.bootstrap.servers' = 'localhost:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'avro'
)

字符串
然而,我的avro模式有太多的字段(100列)。删除每个字段都很耗时。有没有什么方法可以让Flink从模式中推断并自动为每个字段创建一列?

uwopmtnx

uwopmtnx1#

我不知道如何从Avro模式自动推断SQL模式。
这可以帮助您:

import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter
import org.apache.flink.table.api.Schema
 
val s = AvroSchemaConverter.convertToDataType(your_shema_json)
val schema = Schema.newBuilder().fromRowDataType(s).build()

println(schema)

字符串

相关问题