对新列使用不同的avro模式

jw5wzhpr  于 2021-06-04  发布在  Flume
关注(0)|答案(2)|浏览(547)

我使用flume+kafka将日志数据下沉到hdfs。我的接收器数据类型是avro。在avro模式(.avsc)中,有80个字段作为列。
所以我创建了一个这样的外部表

CREATE external TABLE pgar.tiz_biaws_fraud
PARTITIONED BY(partition_date INT)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
LOCATION '/data/datapool/flume/biaws/fraud'
TBLPROPERTIES ('avro.schema.url'='hdfs://xxxx-ns/data/datapool/flume/biaws/fraud.avsc')

现在,我需要向avro模式再添加25列。那样的话,
如果我用新模式创建一个有105列的新表,那么一个项目将有两个表。如果我在未来几天添加或删除一些列,我必须为此创建一个新表。我害怕有很多表在同一个项目中使用不同的模式。
如果我用当前表中的新模式替换旧模式,一个项目只能有一个表,但由于模式冲突,我无法读取和获取旧数据。
在这种情况下,使用avro模式的最佳方法是什么?

ohtdti5x

ohtdti5x1#

这确实具有挑战性。最好的方法是确保您所做的所有架构更改都与旧数据兼容—因此只删除具有默认值的列,并确保您在添加的列中提供默认值。这样,您可以安全地交换模式而不发生冲突,并继续读取旧数据。avro在这方面非常聪明,它被称为“模式进化”(如果你想在google上搜索更多的话),它允许读写器模式有点不同。
另一方面,我想提到的是,kafka有一个本地hdfs连接器(即没有flume),它使用confluent的模式注册表自动处理这些类型的模式更改—您可以使用注册表检查模式是否兼容,如果是这样的话,只需使用新模式写入数据,配置单元表就会自动演化为匹配。

kx5bkwkv

kx5bkwkv2#

我像那样向avro模式添加了新列

{"name":"newColumn1", "type": "string", "default": ""},
{"name":"newColumn2", "type": "string", "default": ""},
{"name":"newColumn3", "type": "string", "default": ""},

当我使用 default 属性,如果当前数据中不存在该列,则返回默认值;如果当前数据中确实存在该列,则按预期返回数据值。
要将空值设置为默认值,需要

{ "name": "newColumn4", "type": [ "string", "null" ], "default": "null" },

或者

{ "name": "newColumn5", "type": [ "null", "string" ]},

null在类型属性中的位置,可以是第一位,也可以是具有默认属性的第二位。

相关问题