scala 如何在Spark中按条件将行减少到单行

hyrbngr7  于 2023-10-18  发布在  Scala
关注(0)|答案(2)|浏览(89)

我有一个Memos的Spark Dataframe,每个memo都有一个id和相关的贷方和借方金额,来自getData调用(在Scala中):

val memoData: Dataset[Row] = getData()

| 备忘录ID|量|贷记或借记|日期|memo_attr_1|
| --|--|--|--|--|
| MEM_1| 100 |D| 2023-06-01| X|
| MEM_1| 20 |C| 2023-06-07 2023-06-07| X|
| MEM_2| 200 |D| 2023-06-01| Y|
| MEM_2| 50 |C| 2023-06-07 2023-06-07| Y|
| MEM_3| 300 |D| 2023-06-01| Z|
我想准备一个新的框架 * 只有一行为每个独特的备忘录 *,调整信贷和借记到最终金额。还需要将行别名为新名称(例如memo_id应称为memo_number),保留原始备忘录的一些属性:

val computedData = compute(memoData)

| 备忘录号码|计算量|贷记或借记|memo_attr_1|
| --|--|--|--|
| MEM_1| 80 |D| X|
| MEM_2| 150 |D| Y|
| MEM_3| 300 |D| Z|
想了解如何使用Spark API实现这一点吗?
如果你能帮忙的话,我将不胜感激。
我想注册一个临时表视图,并通过SQL完成,查看每个唯一的备忘录:

view = data.createOrReplaceTempView(TABLE_ALIAS)

memos = data.select("memo_id").distinct().collect().toList

for (memo <- memos) {
   val adjustedRow = sparkSession.sql(s"SELECT ....")
}

然而,由于我们需要保留一些原始属性,因此在创建调整行时遇到了困难。

rqenqsqc

rqenqsqc1#

一些分组和聚合应该可以完成这项工作。范例:

val data = Seq(
  ("MEM_1", 100, "D", "2023-06-01", "X"),
  ("MEM_1", 20, "C", "2023-06-07", "X"),
  ("MEM_2", 200, "D", "2023-06-01", "Y"),
  ("MEM_2", 50, "C", "2023-06-07", "Y"),
  ("MEM_3", 300, "D", "2023-06-01", "Z")
).toDF("memo_id", "amount", "credit_or_debit", "date", "memo_attr_1")

val computedData = data.withColumn(
    "amount",
    when(col("credit_or_debit") === "C", col("amount") * -1).otherwise(col("amount"))
  ).groupBy(
    col("memo_id").as("memo_number")
  ).agg(
    sum("amount").as("computed_amount"),
    Seq("memo_attr_1").map(c => first(c).as(c)):_*
  ).withColumn(
    "credit_or_debit", 
    when(col("computed_amount") >= 0, "D").otherwise("C")
  )

computedData.show()

//+-----------+---------------+---------------+-----------+
//|memo_number|computed_amount|credit_or_debit|memo_attr_1|
//+-----------+---------------+---------------+-----------+
//|      MEM_1|             80|              D|          X|
//|      MEM_2|            150|              D|          Y|
//|      MEM_3|            300|              D|          Z|
//+-----------+---------------+---------------+-----------+

这将为每个memo_id组选取第一个属性值。如果你想得到所有的属性,那么用collect_set函数替换first

watbbzwu

watbbzwu2#

Memo定义在顶层(它不能在对象、类或trait等中):

case class Memo(memo_id: String, amount: Int, credit_or_debit: String, date: Date, memo_attr_1: String)

def date(year: Int, month: Int, day: Int): Date = Date.valueOf(s"$year-$month-$day")

import org.apache.spark.sql.functions.{sum, collect_set}
import sparkSession.implicits._

def getData(): Dataset[Row] =
  Seq(
    Memo("MEM_1", 100, "D", date(2023, 6, 1),"X"),
    Memo("MEM_1", 20, "C", date(2023, 6, 7), "X"),
    Memo("MEM_2", 200, "D", date(2023, 6, 1), "Y"),
    Memo("MEM_2", 50, "C", date(2023, 6, 7) , "Y"),
    Memo("MEM_3", 300, "D", date(2023, 6, 1), "Z")
  ).toDS().toDF()

val memoData = getData()

def computeData(dataFrame: DataFrame): DataFrame =
  // make debits negative so they can be summed
  dataFrame.selectExpr("memo_id", "memo_attr_1", "if(credit_or_debit = 'D', amount * -1, amount) amount_for_sum" )
    // group by memo_id then sum whilst keeping the memo_attr_1's
    .groupBy("memo_id").agg(sum("amount_for_sum").as("summed_amount"), collect_set("memo_attr_1").as("memo_attr_1"))
    // renaming columns and making debit's show as positive and D
    .selectExpr("memo_id memo_number", "if(summed_amount < 0, summed_amount * -1, summed_amount) computed_amount", "if(summed_amount < 0, 'D', 'C') credit_or_debit", "memo_attr_1")

val computedData = computeData(memoData)

computedData.show()

产量:

+-----------+---------------+---------------+-----------+
|memo_number|computed_amount|credit_or_debit|memo_attr_1|
+-----------+---------------+---------------+-----------+
|      MEM_1|             80|              D|        [X]|
|      MEM_2|            150|              D|        [Y]|
|      MEM_3|            300|              D|        [Z]|
+-----------+---------------+---------------+-----------+

我假设memo_attr_1可以是不同的,所以它是一个收集集,以保持他们所有。
请注意,你最初的想法是为每一行生成新的转换框,如果可能的话,你希望将转换保持在同一个数据集中,让spark一次性完成转换。
如果有用的话,您可以通过对每个字段都在其中的结构使用collect_set(或list)来保留所有匹配备注行的原始字段。

相关问题