如何在spark中获得聚合和

ru9i0ody  于 2021-06-01  发布在  Hadoop
关注(0)|答案(1)|浏览(354)

我是新来的。我使用的是spark 2.2版本。我有以下json格式的输入数据。

{"a_id":6336,"b_sum":10.0,"m_cd":["abc00053"],"td_cnt":[10.0]}
{"a_id":6336,"b_sum":10.0,"m_cd":["abc00053"],"td_cnt":[5.0]}
{"a_id":6336,"b_sum":10.0,"m_cd":["abc00054"],"td_cnt":[20.0]}
{"a_id":6336,"b_sum":10.0,"m_cd":["abc00056"],"td_cnt":[30.0]}
{"a_id":6339,"b_sum":10.0,"m_cd":["abc00051"],"td_cnt":[12.0]}
{"a_id":6339,"b_sum":10.0,"m_cd":["abc00057"],"td_cnt":[7.0]}
{"a_id":6339,"b_sum":10.0,"m_cd":["abc00055"],"td_cnt":[10.0]}
{"a_id":6339,"b_sum":10.0,"m_cd":["abc00058"],"td_cnt":[20.0]}
{"a_id":6339,"b_sum":10.0,"m_cd":["null"],"td_cnt":[null]}

我要按a\u id和b\u sum列上的记录分组,并收集m\u cd列表和中相应的td\u cnt记录

array(["abc00053":15.0,"abc00054":20.0,"abc00056":30.0]).

然后将td\u cnt列值之和作为Dataframe中的td\u cnt新列。
预期产量:

{"a_id":6336,"b_sum":10.0,"td_cnt":["abc00053":15.0,"abc00054":20.0,"abc00056":30.0],"td_cnt_sum":65}
{"a_id":6339,"b_sum":10.0,"td_cnt":["abc00051":12,"abc00057":7.0,"abc00055":10.0,'abc00058":20.0],"td_cnt_sum":49}

请帮帮我。

qncylg1j

qncylg1j1#

你可以用 groupBy/aggsum 一次又一次 groupBy/aggcollect_list 下面是一个示例'
您可以将json文件读取为

val spark = SparkSession.builder().appName("read azure storage").master("local[*]").getOrCreate()

import spark.implicits._
val data = spark.read.json("path to json file ")

因为你的数据 m_cd 以及 td_cnt 负载为 array 如果第一个始终有一个值,则需要选择“开”,否则将分解以添加 array 中的值 row ```
val df = data.select(
$"a_id", $"b_sum", $"m_cd" (0).as("m_cd"),
$"td_cnt" (0).as("td_cnt"))
.na.drop() //And also drop which has nulls in the value

使用groupy by两次获得输出

val df1 = df.groupBy("a_id", "m_cd")
.agg(sum("td_cnt").as("td_cnt_sum"))
.groupBy("a_id")
.agg(collect_list(struct("m_cd", "td_cnt_sum")).as("td_cnt"), sum("td_cnt_sum").as("td_cnt_sum")
)

df1.show(false)

输出:

+----+-------------------------------------------------------------------+----------+
|a_id|td_cnt |td_cnt_sum|
+----+-------------------------------------------------------------------+----------+
|6336|[[abc00056,30.0], [abc00053,15.0], [abc00054,20.0]] |65.0 |
|6339|[[abc00055,10.0], [abc00051,12.0], [abc00057,7.0], [abc00058,20.0]]|49.0 |
+----+-------------------------------------------------------------------+----------+

现在您得到了输出,现在您可以更改您的需求。架构:

root
|-- a_id: long (nullable = true)
|-- td_cnt: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- m_cd: string (nullable = true)
| | |-- td_cnt_sum: double (nullable = true)
|-- td_cnt_sum: double (nullable = true)

相关问题