sql—spark中作为group by子句的dataframe的列值

mctunoxg  于 2021-05-27  发布在  Spark
关注(0)|答案(3)|浏览(410)

我有一个数据,看起来像下面提到的表格

  1. +----+----+--------+-------+--------+----------------------+
  2. |User|Shop|Location| Seller|Quantity| GroupBYClause|
  3. +----+----+--------+-------+--------+----------------------+
  4. | 1| ABC| Loc1|Seller1| 10| Shop, location|
  5. | 1| ABC| Loc1|Seller2| 10| Shop, location|
  6. | 2| ABC| Loc1|Seller1| 10|Shop, location, Seller|
  7. | 2| ABC| Loc1|Seller2| 10|Shop, location, Seller|
  8. | 3| BCD| Loc1|Seller1| 10| location|
  9. | 3| BCD| Loc1|Seller2| 10| location|
  10. | 3| CDE| Loc2|Seller3| 10| location|
  11. +----+----+--------+-------+--------+----------------------+

预期的最终输出是与附加列相同的数据,即sum(数量),其数量之和基于用户提到的聚合
例如,用户1将groupbyclause称为“shop,location”,因此与卖家无关,用户1的总和(数量)是20
类似地,对于用户2,groupbyclause为“shop,location,seller”,因此每行的sum(数量)都是10
期望输出

  1. +------+----+--------+-------+--------+----------------------+-------------+
  2. |UserId|Shop|location| Seller|Quantity| GroupBYClause|Sum(Quantity)|
  3. +------+----+--------+-------+--------+----------------------+-------------+
  4. | 1| ABC| Loc1|Seller1| 10| Shop, location| 20|
  5. | 1| ABC| Loc1|Seller2| 10| Shop, location| 20|
  6. | 2| ABC| Loc1|Seller1| 10|Shop, location, Seller| 10|
  7. | 2| ABC| Loc1|Seller2| 10|Shop, location, Seller| 10|
  8. | 3| BCD| Loc1|Seller1| 10| location| 20|
  9. | 3| BCD| Loc1|Seller2| 10| location| 20|
  10. | 3| CDE| Loc2|Seller3| 10| location| 10|
  11. +------+----+--------+-------+--------+----------------------+-------------+

我面临的挑战是在spark中将列值用作groupby子句
请帮忙

  1. val df = spark.createDataFrame(Seq(
  2. (1, "ABC","Loc1","Seller1", 10, "Shop, location"),
  3. (1, "ABC","Loc1","Seller2", 10, "Shop, location"),
  4. (2, "ABC","Loc1","Seller1", 10, "Shop, location, Seller"),
  5. (2, "ABC","Loc1","Seller2", 10, "Shop, location, Seller"),
  6. (3, "BCD","Loc1","Seller1", 10, "location"),
  7. (3, "BCD","Loc1","Seller2", 10, "location"),
  8. (3, "CDE","Loc2","Seller3", 10, "location")
  9. )).toDF("UserId","Shop", "Location","Seller", "Quantity", "GroupBYClause")
ohfgkhjo

ohfgkhjo1#

试试这个-

加载提供的测试数据

  1. df1.show(false)
  2. df1.printSchema()
  3. /**
  4. * +----+----+--------+-------+--------+----------------------+
  5. * |User|Shop|Location|Seller |Quantity|GroupBYClause |
  6. * +----+----+--------+-------+--------+----------------------+
  7. * |1 |ABC |Loc1 |Seller1|10 |Shop, location |
  8. * |1 |ABC |Loc1 |Seller2|10 |Shop, location |
  9. * |2 |ABC |Loc1 |Seller1|10 |Shop, location, Seller|
  10. * |2 |ABC |Loc1 |Seller2|10 |Shop, location,Seller |
  11. * |3 |BCD |Loc1 |Seller1|10 |location |
  12. * |3 |BCD |Loc1 |Seller2|10 |location |
  13. * |3 |CDE |Loc2 |Seller3|10 |location |
  14. * +----+----+--------+-------+--------+----------------------+
  15. *
  16. * root
  17. * |-- User: integer (nullable = true)
  18. * |-- Shop: string (nullable = true)
  19. * |-- Location: string (nullable = true)
  20. * |-- Seller: string (nullable = true)
  21. * |-- Quantity: integer (nullable = true)
  22. * |-- GroupBYClause: string (nullable = true)
  23. */

求和

  1. val isShopLocation = Seq("Shop", "location").map(array_contains($"arr", _)).reduce(_ && _)
  2. val isShopLocationSeller = Seq("Shop", "location", "Seller").map(array_contains($"arr", _)).reduce(_ && _)
  3. val isLocation = array_contains($"arr", "location")
  4. df1.withColumn("arr", split($"GroupBYClause", "\\s*,\\s*"))
  5. .withColumn("arr",
  6. when(isShopLocationSeller, expr("array(Shop, location, Seller)"))
  7. .when(isShopLocation, expr("array(Shop, location)"))
  8. .when(isLocation, expr("array(location)"))
  9. ).withColumn("sum_quantity",
  10. sum("Quantity").over(Window.partitionBy("User","arr")))
  11. .show(false)
  12. /**
  13. * +----+----+--------+-------+--------+----------------------+--------------------+------------+
  14. * |User|Shop|Location|Seller |Quantity|GroupBYClause |arr |sum_quantity|
  15. * +----+----+--------+-------+--------+----------------------+--------------------+------------+
  16. * |1 |ABC |Loc1 |Seller1|10 |Shop, location |[ABC, Loc1] |20 |
  17. * |1 |ABC |Loc1 |Seller2|10 |Shop, location |[ABC, Loc1] |20 |
  18. * |2 |ABC |Loc1 |Seller2|10 |Shop, location,Seller |[ABC, Loc1, Seller2]|10 |
  19. * |3 |CDE |Loc2 |Seller3|10 |location |[Loc2] |10 |
  20. * |2 |ABC |Loc1 |Seller1|10 |Shop, location, Seller|[ABC, Loc1, Seller1]|10 |
  21. * |3 |BCD |Loc1 |Seller1|10 |location |[Loc1] |20 |
  22. * |3 |BCD |Loc1 |Seller2|10 |location |[Loc1] |20 |
  23. * +----+----+--------+-------+--------+----------------------+--------------------+------------+
  24. */

动态定义分区

  1. val columns = Seq("Shop", "location", "Seller").flatMap(f => Seq(lit(f), col(f)))
  2. df1.withColumn("arr", split($"GroupBYClause", "\\s*,\\s*"))
  3. .withColumn("map1", map(columns: _*))
  4. .withColumn("arr", expr("TRANSFORM(arr, x -> map1[x])"))
  5. .withColumn("sum_quantity",
  6. sum("Quantity").over(Window.partitionBy("User","arr")))
  7. .show(false)
  8. /**
  9. * +----+----+--------+-------+--------+----------------------+--------------------+--------------------------------------------------+------------+
  10. * |User|Shop|Location|Seller |Quantity|GroupBYClause |arr |map1 |sum_quantity|
  11. * +----+----+--------+-------+--------+----------------------+--------------------+--------------------------------------------------+------------+
  12. * |1 |ABC |Loc1 |Seller1|10 |Shop, location |[ABC, Loc1] |[Shop -> ABC, location -> Loc1, Seller -> Seller1]|20 |
  13. * |1 |ABC |Loc1 |Seller2|10 |Shop, location |[ABC, Loc1] |[Shop -> ABC, location -> Loc1, Seller -> Seller2]|20 |
  14. * |2 |ABC |Loc1 |Seller2|10 |Shop, location,Seller |[ABC, Loc1, Seller2]|[Shop -> ABC, location -> Loc1, Seller -> Seller2]|10 |
  15. * |3 |CDE |Loc2 |Seller3|10 |location |[Loc2] |[Shop -> CDE, location -> Loc2, Seller -> Seller3]|10 |
  16. * |2 |ABC |Loc1 |Seller1|10 |Shop, location, Seller|[ABC, Loc1, Seller1]|[Shop -> ABC, location -> Loc1, Seller -> Seller1]|10 |
  17. * |3 |BCD |Loc1 |Seller1|10 |location |[Loc1] |[Shop -> BCD, location -> Loc1, Seller -> Seller1]|20 |
  18. * |3 |BCD |Loc1 |Seller2|10 |location |[Loc1] |[Shop -> BCD, location -> Loc1, Seller -> Seller2]|20 |
  19. * +----+----+--------+-------+--------+----------------------+--------------------+--------------------------------------------------+------------+
  20. */

编辑-1(基于评论)

  1. // input
  2. +----+----+--------+-------+--------+----------------------+
  3. |User|Shop|Location|Seller |Quantity|GroupBYClause |
  4. +----+----+--------+-------+--------+----------------------+
  5. |1 |ABC |Loc1 |Seller1|10 |Shop, location |
  6. |1 |ABC |Loc1 |Seller2|10 |Shop, location |
  7. |2 |ABC |Loc1 |Seller1|10 |Shop, location, Seller|
  8. |2 |ABC |Loc1 |Seller2|10 |Shop, location,Seller |
  9. |3 |BCD |Loc1 |Seller1|10 |location,Seller |
  10. |3 |BCD |Loc1 |Seller2|10 |location |
  11. |3 |CDE |Loc2 |Seller3|10 |location |
  12. +----+----+--------+-------+--------+----------------------+
  13. root
  14. |-- User: integer (nullable = true)
  15. |-- Shop: string (nullable = true)
  16. |-- Location: string (nullable = true)
  17. |-- Seller: string (nullable = true)
  18. |-- Quantity: integer (nullable = true)
  19. |-- GroupBYClause: string (nullable = true)
  20. // Output
  21. +----+----+--------+-------+--------+----------------------+--------------------+--------------------------------------------------+------------+
  22. |User|Shop|Location|Seller |Quantity|GroupBYClause |arr |map1 |sum_quantity|
  23. +----+----+--------+-------+--------+----------------------+--------------------+--------------------------------------------------+------------+
  24. |1 |ABC |Loc1 |Seller1|10 |Shop, location |[ABC, Loc1] |[Shop -> ABC, location -> Loc1, Seller -> Seller1]|20 |
  25. |1 |ABC |Loc1 |Seller2|10 |Shop, location |[ABC, Loc1] |[Shop -> ABC, location -> Loc1, Seller -> Seller2]|20 |
  26. |2 |ABC |Loc1 |Seller2|10 |Shop, location,Seller |[ABC, Loc1, Seller2]|[Shop -> ABC, location -> Loc1, Seller -> Seller2]|10 |
  27. |3 |BCD |Loc1 |Seller1|10 |location,Seller |[Loc1, Seller1] |[Shop -> BCD, location -> Loc1, Seller -> Seller1]|10 |
  28. |3 |CDE |Loc2 |Seller3|10 |location |[Loc2] |[Shop -> CDE, location -> Loc2, Seller -> Seller3]|10 |
  29. |2 |ABC |Loc1 |Seller1|10 |Shop, location, Seller|[ABC, Loc1, Seller1]|[Shop -> ABC, location -> Loc1, Seller -> Seller1]|10 |
  30. |3 |BCD |Loc1 |Seller2|10 |location |[Loc1] |[Shop -> BCD, location -> Loc1, Seller -> Seller2]|10 |
  31. +----+----+--------+-------+--------+----------------------+--------------------+--------------------------------------------------+------------+
展开查看全部
6bc51xsx

6bc51xsx2#

可以获取所有不同的“groupbyclause”,并为每个生成窗口,并用“when(..)。否则” Package :

  1. val distinctGroupBY = df.select("GroupBYClause").distinct().as(Encoders.STRING).collect()
  2. val sumQuantityColumn = distinctGroupBY
  3. .foldLeft(lit(0))(
  4. (acc, fieldList) =>
  5. when($"GroupBYClause" === fieldList,
  6. sum("Quantity").over(Window.partitionBy("UserId", fieldList.split(",").map(_.trim): _ *)))
  7. .otherwise(acc)
  8. )
  9. val result = df.withColumn("Sum(Quantity)", sumQuantityColumn)
dldeef67

dldeef673#

立方体函数在这里很有用。看一看:

  1. # Import and test data
  2. import pyspark.sql.functions as F
  3. from pyspark.sql.types import *
  4. tst = spark.createDataFrame([
  5. (1, "ABC","Loc1","Seller1", 10, "shop,location"),
  6. (1, "ABC","Loc1","Seller2", 10, "shop,location"),
  7. (2, "ABC","Loc1","Seller1", 10, "shop,location,seller"),
  8. (2, "ABC","Loc1","Seller2", 10, "shop,location,seller"),
  9. (3, "BCD","Loc1","Seller1", 10, "location"),
  10. (3, "BCD","Loc1","Seller2", 10, "location"),
  11. (3, "CDE","Loc2","Seller3", 10, "location")
  12. ]).toDF("UserId","shop", "location","seller", "quantity", "GroupBYClause")
  13. # split the groupby clause columns to use in udf
  14. tst_in = tst.withColumn("grp_arr",F.split('GroupBYClause',','))
  15. # udf to fetch the values of column in groupby clause
  16. @F.udf(ArrayType(StringType()))
  17. def gen_arr(row,group_clause):
  18. res=[row[x] for x in group_clause]
  19. return(res)
  20. # The struct can also be constructed for a large column list by list comprehension
  21. tst_in1 = tst_in.withColumn("grp_coln_arr",gen_arr(F.struct(F.col('shop'),F.col('location'),F.col('seller')),F.col('grp_arr')))
  22. # %% perform a cubed aggregation
  23. tst_cub = tst.cube('shop','location','seller').agg(F.sum('quantity').alias('sum_q')).fillna('special_character')
  24. # %% in the cubed aggregation, find the relevant rows
  25. tst_cub1 = tst_cub.withColumn('grp_arr_cub', F.array('shop','location','seller'))
  26. tst_cub2 = tst_cub1.withColumn("grp_arr_cub1",F.array_remove(F.col("grp_arr_cub"),'special_character'))
  27. # %% select the needed quantities using join
  28. tst_res = tst_in1.join(tst_cub2,tst_in1.grp_coln_arr==tst_cub2.grp_arr_cub1,how='left')

结果(我已经包括了所有列供您理解,如果您打算使用它,请确保将它们清理干净)

  1. In [99]: tst_res.show()
  2. +------+----+--------+-------+--------+--------------------+--------------------+--------------------+-----------------+--------+-----------------+-----+--------------------+--------------------+
  3. |UserId|shop|location| seller|quantity| GroupBYClause| grp_arr| grp_coln_arr| shop|location| seller|sum_q| grp_arr_cub| grp_arr_cub1|
  4. +------+----+--------+-------+--------+--------------------+--------------------+--------------------+-----------------+--------+-----------------+-----+--------------------+--------------------+
  5. | 2| ABC| Loc1|Seller1| 10|shop,location,seller|[shop, location, ...|[ABC, Loc1, Seller1]| ABC| Loc1| Seller1| 20|[ABC, Loc1, Seller1]|[ABC, Loc1, Seller1]|
  6. | 2| ABC| Loc1|Seller2| 10|shop,location,seller|[shop, location, ...|[ABC, Loc1, Seller2]| ABC| Loc1| Seller2| 20|[ABC, Loc1, Seller2]|[ABC, Loc1, Seller2]|
  7. | 3| BCD| Loc1|Seller1| 10| location| [location]| [Loc1]|special_character| Loc1|special_character| 60|[special_characte...| [Loc1]|
  8. | 3| BCD| Loc1|Seller2| 10| location| [location]| [Loc1]|special_character| Loc1|special_character| 60|[special_characte...| [Loc1]|
  9. | 3| CDE| Loc2|Seller3| 10| location| [location]| [Loc2]|special_character| Loc2|special_character| 10|[special_characte...| [Loc2]|
  10. | 1| ABC| Loc1|Seller1| 10| shop,location| [shop, location]| [ABC, Loc1]| ABC| Loc1|special_character| 40|[ABC, Loc1, speci...| [ABC, Loc1]|
  11. | 1| ABC| Loc1|Seller2| 10| shop,location| [shop, location]| [ABC, Loc1]| ABC| Loc1|special_character| 40|[ABC, Loc1, speci...| [ABC, Loc1]|
  12. +------+----+--------+-------+--------+--------------------+--------------------+--------------------+-----------------+--------+-----------------+-----+--------------------+--------------------+
展开查看全部

相关问题