如何在spark java中将行中的struct字段转换为avro记录

n7taea2i  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(458)

我有一个用例,我想把一个struct字段转换成avro记录。struct字段最初Map到avro类型。输入数据是avro文件,struct字段对应于输入avro记录中的一个字段。
下面是我想用伪代码实现的。

DataSet<Row> data = loadInput(); // data is of form (foo, bar, myStruct) from avro data. 

// do some joins to add more data
data = doJoins(data); // now data is of form (a, b, myStruct)

// transform DataSet<Row> to DataSet<MyType> 
DataSet<MyType> myData = data.map(row -> myUDF(row), encoderOfMyType);

// method `myUDF` definition
MyType myUDF(Row row) {
  String a = row.getAs("a");
  String b = row.getAs("b");

  // MyStruct is the generated avro class that corresponds to field myStruct 
  MyStruct myStruct = convertToAvro(row.getAs("myStruct"));

  return generateMyType(a, b, myStruct);
}

我的问题是:如何实施 convertToAvro 上述伪代码中的方法?

ax6ht2ek

ax6ht2ek1#

根据文件:
avro包提供函数给\u avro将一列编码为avro格式的二进制,并从\u avro()将avro二进制数据解码为一列。这两个函数都将一列转换为另一列,并且输入/输出sql数据类型可以是复杂类型或基元类型。
_avro的函数充当 convertToAvro 方法:

import static org.apache.spark.sql.avro.functions.*;

//put the avro schema of the struct column into a string
//in my example I assume that the struct consists of a two fields:
//a long field (s1) and a string field (s2)
String schema = "{\"type\":\"record\",\"name\":\"mystruct\"," +
        "\"namespace\":\"topLevelRecord\",\"fields\":[{\"name\":\"s1\"," +
        "\"type\":[\"long\",\"null\"]},{\"name\":\"s2\",\"type\":" +
        "[\"string\",\"null\"]}]},\"null\"]}";

data = ...

//add an additional column containing the struct as binary column
Dataset<Row> data2 = df.withColumn("to_avro", to_avro(data.col("myStruct"), schema));
df2.printSchema();
df2.show(false);

印刷品

root
 |-- a: string (nullable = true)
 |-- b: string (nullable = true)
 |-- mystruct: struct (nullable = true)
 |    |-- s1: long (nullable = true)
 |    |-- s2: string (nullable = true)
 |-- to_avro: binary (nullable = true)

+----+----+----------+----------------------------+
|a   |b   |mystruct  |to_avro                     |
+----+----+----------+----------------------------+
|foo1|bar1|[1, one]  |[00 02 00 06 6F 6E 65]      |
|foo2|bar2|[3, three]|[00 06 00 0A 74 68 72 65 65]|
+----+----+----------+----------------------------+

要将avro列转换回,可以使用来自\u avro的函数:

Dataset<Row> data3 = data2.withColumn("from_avro", from_avro(data2.col("to_avro"), schema));
df3.printSchema();
df3.show();

输出:

root
 |-- a: string (nullable = true)
 |-- b: string (nullable = true)
 |-- mystruct: struct (nullable = true)
 |    |-- s1: long (nullable = true)
 |    |-- s2: string (nullable = true)
 |-- to_avro: binary (nullable = true)
 |-- from_avro: struct (nullable = true)
 |    |-- s1: long (nullable = true)
 |    |-- s2: string (nullable = true)

+----+----+----------+--------------------+----------+
|   a|   b|  mystruct|             to_avro| from_avro|
+----+----+----------+--------------------+----------+
|foo1|bar1|  [1, one]|[00 02 00 06 6F 6...|  [1, one]|
|foo2|bar2|[3, three]|[00 06 00 0A 74 6...|[3, three]|
+----+----+----------+--------------------+----------+

关于udf的一句话:在这个问题中,您在udf中执行了到avro格式的转换。我更希望在udf中只包含实际的业务逻辑,并将格式转换保留在外部。这将逻辑和格式转换分开。如果需要,可以删除原始列 mystruct 在创建avro列之后。

相关问题