我是新来的。我使用的是spark 2.2版本。我有以下json格式的输入数据。
{"a_id":6336,"b_sum":10.0,"m_cd":["abc00053"],"td_cnt":[10.0]}
{"a_id":6336,"b_sum":10.0,"m_cd":["abc00053"],"td_cnt":[5.0]}
{"a_id":6336,"b_sum":10.0,"m_cd":["abc00054"],"td_cnt":[20.0]}
{"a_id":6336,"b_sum":10.0,"m_cd":["abc00056"],"td_cnt":[30.0]}
{"a_id":6339,"b_sum":10.0,"m_cd":["abc00051"],"td_cnt":[12.0]}
{"a_id":6339,"b_sum":10.0,"m_cd":["abc00057"],"td_cnt":[7.0]}
{"a_id":6339,"b_sum":10.0,"m_cd":["abc00055"],"td_cnt":[10.0]}
{"a_id":6339,"b_sum":10.0,"m_cd":["abc00058"],"td_cnt":[20.0]}
{"a_id":6339,"b_sum":10.0,"m_cd":["null"],"td_cnt":[null]}
我要按a\u id和b\u sum列上的记录分组,并收集m\u cd列表和中相应的td\u cnt记录
array(["abc00053":15.0,"abc00054":20.0,"abc00056":30.0]).
然后将td\u cnt列值之和作为Dataframe中的td\u cnt新列。
预期产量:
{"a_id":6336,"b_sum":10.0,"td_cnt":["abc00053":15.0,"abc00054":20.0,"abc00056":30.0],"td_cnt_sum":65}
{"a_id":6339,"b_sum":10.0,"td_cnt":["abc00051":12,"abc00057":7.0,"abc00055":10.0,'abc00058":20.0],"td_cnt_sum":49}
请帮帮我。
1条答案
按热度按时间qncylg1j1#
你可以用
groupBy/agg
与sum
一次又一次groupBy/agg
与collect_list
下面是一个示例'您可以将json文件读取为
因为你的数据
m_cd
以及td_cnt
负载为array
如果第一个始终有一个值,则需要选择“开”,否则将分解以添加array
中的值row
```val df = data.select(
$"a_id", $"b_sum", $"m_cd" (0).as("m_cd"),
$"td_cnt" (0).as("td_cnt"))
.na.drop() //And also drop which has nulls in the value
val df1 = df.groupBy("a_id", "m_cd")
.agg(sum("td_cnt").as("td_cnt_sum"))
.groupBy("a_id")
.agg(collect_list(struct("m_cd", "td_cnt_sum")).as("td_cnt"), sum("td_cnt_sum").as("td_cnt_sum")
)
df1.show(false)
+----+-------------------------------------------------------------------+----------+
|a_id|td_cnt |td_cnt_sum|
+----+-------------------------------------------------------------------+----------+
|6336|[[abc00056,30.0], [abc00053,15.0], [abc00054,20.0]] |65.0 |
|6339|[[abc00055,10.0], [abc00051,12.0], [abc00057,7.0], [abc00058,20.0]]|49.0 |
+----+-------------------------------------------------------------------+----------+
root
|-- a_id: long (nullable = true)
|-- td_cnt: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- m_cd: string (nullable = true)
| | |-- td_cnt_sum: double (nullable = true)
|-- td_cnt_sum: double (nullable = true)