flink如何使用从avro输入数据推断出的模式创建表

mmvthczy  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(412)

我在flink数据集中加载了一个avro文件:

  1. AvroInputFormat<GenericRecord> test = new AvroInputFormat<GenericRecord>(
  2. new Path("PathToAvroFile")
  3. , GenericRecord.class);
  4. DataSet<GenericRecord> DS = env.createInput(test);
  5. usersDS.print();

下面是打印ds的结果:

  1. {"N_NATIONKEY": 14, "N_NAME": "KENYA", "N_REGIONKEY": 0, "N_COMMENT": " pending excuses haggle furiously deposits. pending, express pinto beans wake fluffily past t"}
  2. {"N_NATIONKEY": 15, "N_NAME": "MOROCCO", "N_REGIONKEY": 0, "N_COMMENT": "rns. blithely bold courts among the closely regular packages use furiously bold platelets?"}
  3. {"N_NATIONKEY": 16, "N_NAME": "MOZAMBIQUE", "N_REGIONKEY": 0, "N_COMMENT": "s. ironic, unusual asymptotes wake blithely r"}
  4. {"N_NATIONKEY": 17, "N_NAME": "PERU", "N_REGIONKEY": 1, "N_COMMENT": "platelets. blithely pending dependencies use fluffily across the even pinto beans. carefully silent accoun"}
  5. {"N_NATIONKEY": 18, "N_NAME": "CHINA", "N_REGIONKEY": 2, "N_COMMENT": "c dependencies. furiously express notornis sleep slyly regular accounts. ideas sleep. depos"}
  6. {"N_NATIONKEY": 19, "N_NAME": "ROMANIA", "N_REGIONKEY": 3, "N_COMMENT": "ular asymptotes are about the furious multipliers. express dependencies nag above the ironically ironic account"}
  7. {"N_NATIONKEY": 20, "N_NAME": "SAUDI ARABIA", "N_REGIONKEY": 4, "N_COMMENT": "ts. silent requests haggle. closely express packages sleep across the blithely"}

现在我想从ds数据集中创建一个表,它的模式与avro文件完全相同,我的意思是列应该是n\u nationkey、n\u name、n\u regionkey和n\u comment。
我知道用这句话:

  1. tableEnv.registerDataSet("tbTest", DS, "field1, field2, ...");

我可以创建一个表并设置列,但我希望这些列能够从数据中自动推断出来。有可能吗?另外,我试过

  1. tableEnv.registerDataSet("tbTest", DS);

但它创建了一个具有以下模式的表:

  1. root
  2. |-- f0: GenericType<org.apache.avro.generic.GenericRecord>
iq0todco

iq0todco1#

GenericRecord 是表&sqlapi运行时的黑盒,因为字段数及其数据类型未定义。我建议使用avro生成的类 SpecificRecord . 这些特定类型也可以被flink的类型系统识别,您可以用适当的数据类型正确地处理各个字段。
或者,您可以实现一个定制的udf,该udf提取具有适当数据类型的字段 getAvroInt(f0, "myField") , getAvroString(f0, "myField") 等。
一些伪代码:

  1. class AvroStringFieldExtract extends ScalarFunction {
  2. public String eval(GenericRecord r, String fieldName) {
  3. return r.get(fieldName).toString();
  4. }
  5. }
  6. tableEnv.registerFunction("getAvroFieldString", new AvroStringFieldExtract())

相关问题