pyspark 基于聚合结果的透视列

wfveoks0  于 2022-11-01  发布在  Spark
关注(0)|答案(3)|浏览(224)

我有一个PySpark Dataframe :
| 用户标识|库|作用|
| - -|- -|- -|
| 一百二十三|小行星2345| 2个|
| 一百二十三|小行星2345|第0页|
| 一百二十三|小行星5422|第0页|
| 一百二十三|小行星762|第0页|
| 二百三十一|小行星4322| 2个|
| 二百三十一|小行星4322|第0页|
| 二百三十一|小行星8342|第0页|
| 二百三十一|小行星5342|第0页|
输出应如下所示:
| 用户标识|sku_pos| sku_neg|
| - -|- -|- -|
| 一百二十三|小行星2345|小行星5422|
| 一百二十三|小行星2345|小行星762|
| 二百三十一|小行星4322|小行星8342|
| 二百三十一|小行星4322|小行星5342|
对于每个不同的“userid”,“action”不大于0的“sku”将进入“sku_neg”列,而“action”大于0的“sku”将进入“sku_pos”列。

bnlyeluc

bnlyeluc1#

需要几个聚合:

  • 首先,将 pos/neg 状态分配给“sku”
  • 然后在第二次汇总中使用此状态将所有“sku”收集到列表中

最后,展开列表。
输入:

from pyspark.sql import functions as F
df = spark.createDataFrame(
    [('123', '2345', 2),
     ('123', '2345', 0),
     ('123', '5422', 0),
     ('123', '7622', 0),
     ('231', '4322', 2),
     ('231', '4322', 0),
     ('231', '8342', 0),
     ('231', '5342', 0)],
    ['userid', 'sku', 'action'])

脚本:

df = df.groupBy('userid', 'sku').agg(
    F.when(F.max('action') > 0, 'p').otherwise('n').alias('_flag')
)
df = (df
    .groupBy('userid').pivot('_flag', ['p', 'n']).agg(F.collect_list('sku'))
    .withColumn('sku_pos', F.explode('p'))
    .withColumn('sku_neg', F.explode('n'))
    .drop('p', 'n')
)

df.show()

# +------+-------+-------+

# |userid|sku_pos|sku_neg|

# +------+-------+-------+

# |   231|   4322|   5342|

# |   231|   4322|   8342|

# |   123|   2345|   7622|

# |   123|   2345|   5422|

# +------+-------+-------+
monwx1rj

monwx1rj2#

通过过滤pos/neg记录并按“userid”分组来创建正 Dataframe 和负 Dataframe :

df_pos = df \
  .filter(F.col("action") > 0) \
  .groupBy("userid") \
  .agg(F.collect_set("sku").alias("sku_pos_list")) \
  .withColumnRenamed("userid", "userid_pos")

[Out]:
+----------+------------+
|userid_pos|sku_pos_list|
+----------+------------+
|       123|      [2345]|
|       231|      [4322]|
+----------+------------+

df_neg = df \
  .filter(F.col("action") <= 0) \
  .groupBy("userid") \
  .agg(F.collect_set("sku").alias("sku_neg_list")) \
  .withColumnRenamed("userid", "userid_neg")

[Out]:
+----------+------------------+
|userid_neg|      sku_neg_list|
+----------+------------------+
|       123|[2345, 5422, 7622]|
|       231|[8342, 4322, 5342]|
+----------+------------------+

连接正负 Dataframe 并分解正/负记录:

df_joined = df_pos.join(df_neg, (F.col("userid_pos")==F.col("userid_neg")), how="full")

# Clean up null, empty

df_joined = df_joined \
  .withColumn("userid", F.when(F.col("userid_pos").isNotNull(), F.col("userid_pos")).otherwise(F.col("userid_neg"))).drop("userid_pos", "userid_neg") \
  .withColumn("sku_pos_list", F.when(F.col("sku_pos_list").isNull(), F.array([F.lit(-1)])).otherwise(F.col("sku_pos_list"))) \
  .withColumn("sku_neg_list", F.when(F.col("sku_neg_list").isNull(), F.array([F.lit(-1)])).otherwise(F.col("sku_neg_list")))

[Out]:
+------------+------------------+------+
|sku_pos_list|sku_neg_list      |userid|
+------------+------------------+------+
|[2345]      |[2345, 5422, 7622]|123   |
|[4322]      |[8342, 4322, 5342]|231   |
+------------+------------------+------+

df_joined = df_joined \
  .withColumn("sku_pos", F.explode("sku_pos_list")) \
  .withColumn("sku_neg", F.explode("sku_neg_list")) \
  .drop("sku_pos_list", "sku_neg_list") \
  .filter(F.col("sku_pos") != F.col("sku_neg"))

[Out]:
+------+-------+-------+
|userid|sku_pos|sku_neg|
+------+-------+-------+
|   123|   2345|   5422|
|   123|   2345|   7622|
|   231|   4322|   8342|
|   231|   4322|   5342|
+------+-------+-------+

使用的数据集:

df = spark.createDataFrame([
  (123,2345,2),
  (123,2345,0),
  (123,5422,0),
  (123,7622,0),
  (231,4322,2),
  (231,4322,0),
  (231,8342,0),
  (231,5342,0),
], ["userid", "sku", "action"])
oo7oh9g9

oo7oh9g93#

另一个解决方案看起来很好,但为了以防万一,另一个方法不需要连接。注意,我假设每个userid只有一个sku_pos。如果不是这样,这个方法就不起作用。

spark.read.option("header", "true").csv("sku")\
    .withColumn("action", f.col("action") > 0)\
    .groupBy("userid", "sku")\
    .agg(f.max("action").alias("action"))\
    .groupBy("userid", "action")\
    .agg(f.collect_set("sku").alias("skus"))\
    .withColumn("sku_pos", f.col("skus").getItem(0))\
    .withColumn("sku_neg", f.when(~ f.col("action"), f.col("skus")))\
    .groupBy("userid")\
    .agg(f.first("sku_pos").alias("sku_pos"), f.first("sku_neg", ignorenulls=True).alias("sku_neg"))\
    .withColumn("sku_neg", f.explode("sku_neg"))\
    .show()\
+------+-------+-------+
|userid|sku_pos|sku_neg|
+------+-------+-------+
|   123|   5422|   5422|
|   123|   5422|   7622|
|   231|   4322|   5342|
|   231|   4322|   8342|
+------+-------+-------+

基本上,首先使用groupBy分别收集正的和负的sku,然后使用f.col("skus").getItem(0)只选择一个sku_pos,使用另一个groupBy使每个userid有一行,最后分解sku_neg数组。

相关问题