我有一个Memos的Spark Dataframe,每个memo都有一个id和相关的贷方和借方金额,来自getData调用(在Scala中):
val memoData: Dataset[Row] = getData()
| 备忘录ID|量|贷记或借记|日期|memo_attr_1|
| --|--|--|--|--|
| MEM_1| 100 |D| 2023-06-01| X|
| MEM_1| 20 |C| 2023-06-07 2023-06-07| X|
| MEM_2| 200 |D| 2023-06-01| Y|
| MEM_2| 50 |C| 2023-06-07 2023-06-07| Y|
| MEM_3| 300 |D| 2023-06-01| Z|
我想准备一个新的框架 * 只有一行为每个独特的备忘录 *,调整信贷和借记到最终金额。还需要将行别名为新名称(例如memo_id
应称为memo_number
),保留原始备忘录的一些属性:
val computedData = compute(memoData)
| 备忘录号码|计算量|贷记或借记|memo_attr_1|
| --|--|--|--|
| MEM_1| 80 |D| X|
| MEM_2| 150 |D| Y|
| MEM_3| 300 |D| Z|
想了解如何使用Spark API实现这一点吗?
如果你能帮忙的话,我将不胜感激。
我想注册一个临时表视图,并通过SQL完成,查看每个唯一的备忘录:
view = data.createOrReplaceTempView(TABLE_ALIAS)
memos = data.select("memo_id").distinct().collect().toList
for (memo <- memos) {
val adjustedRow = sparkSession.sql(s"SELECT ....")
}
然而,由于我们需要保留一些原始属性,因此在创建调整行时遇到了困难。
2条答案
按热度按时间rqenqsqc1#
一些分组和聚合应该可以完成这项工作。范例:
这将为每个
memo_id
组选取第一个属性值。如果你想得到所有的属性,那么用collect_set
函数替换first
。watbbzwu2#
Memo定义在顶层(它不能在对象、类或trait等中):
产量:
我假设memo_attr_1可以是不同的,所以它是一个收集集,以保持他们所有。
请注意,你最初的想法是为每一行生成新的转换框,如果可能的话,你希望将转换保持在同一个数据集中,让spark一次性完成转换。
如果有用的话,您可以通过对每个字段都在其中的结构使用collect_set(或list)来保留所有匹配备注行的原始字段。