向json列添加新字段

cfh9epnr  于 2021-07-12  发布在  Spark
关注(0)|答案(1)|浏览(401)

我使用spark从cassandra读取行,其中一列是json对象。以下是数据集中的架构和示例行:

root
 |-- attributes: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- count: integer (nullable = true)
 |    |    |-- score: double (nullable = true)
 |    |    |-- id: integer (nullable = true)
 |-- id: string (nullable = true)
 |-- col1: string (nullable = true)

+--------------------------------------------------------------------------------+--------+---------+
|attributes                                                                      |id      |col1     |
+--------------------------------------------------------------------------------+--------+---------+
|[[usage,2,5.0,12], [price,1,10.0,48], [hair,1,10.0,23737], [curls,1,10.0,30807]]|19400335|val_str_1|
+--------------------------------------------------------------------------------+--------+---------+

使用以下架构从表中加载:

ArrayType schema =
                DataTypes.createArrayType(
                        new StructType(
                                new StructField[] {
                                        new StructField("name", DataTypes.StringType, true, Metadata.empty()),
                                        new StructField("count", DataTypes.IntegerType, true, Metadata.empty()),
                                        new StructField("score", DataTypes.DoubleType, true, Metadata.empty()),
                                        new StructField("id", DataTypes.IntegerType, true, Metadata.empty())}));

是否可以将新字段注入到 attributes 列-将值从 id 列并将值从 col1 结构中的列?

1tu0hz3e

1tu0hz3e1#

你可以试着用 transform 函数更新数组中的每个结构元素:

df.withColumn("attributes", expr("transform(attributes, x -> struct(x.name as name, x.count as count, x.score as score, x.id as id, id as root_id, col1 as root_col1))"))

对于spark<2.4,可以分解数组列,然后更新groupby以再次收集列表:

df.withColumn("attr_structs", explode(col("attributes")))
    .withColumn("attr_structs", struct(col("attr_structs.*"), col("id").alias("root_id"), col("col1").alias("root_col1")))
    .groupBy("id", "col1")
    .agg(collect_list(col("attr_structs")).alias("attributes"))

相关问题