pyspark数据透视和创建数组列

hmmo2u0o  于 2021-07-09  发布在  Spark
关注(0)|答案(1)|浏览(318)

我有一个输入:

+---------+-------+--------+----------+----------+
|timestamp|user_id|results |event_name|product_id|
+---------+-------+--------+----------+----------+
|1000     |user_1 |result 1|Click     |1         |
|1001     |user_1 |result 1|View      |1         |
|1002     |user_1 |result 2|Click     |3         |
|1003     |user_1 |result 2|View      |4         |
|1004     |user_1 |result 2|View      |5         |
+---------+-------+--------+----------+----------+

root
 |-- timestamp: timestamp (nullable = true)
 |-- user_id: string (nullable = true)
 |-- results: string (nullable = true)
 |-- event_name: string (nullable = true)
 |-- product_id: string (nullable = true)

我想将此转换为以下内容,以确保保留用户标识和结果的唯一组合,并根据给定的事件名称聚合产品标识,如下所示:

+-------+--------+---------------+---------------+
|user_id|results |product_clicked|products_viewed|
+-------+--------+---------------+---------------+
|user_1 |result 1|[1]            |[1]            |
|user_1 |result 2|[4,5]          |[3]            |
+-------+--------+---------------+---------------+

root
 |-- user_id: string (nullable = true)
 |-- results: string (nullable = true)
 |-- product_clicked: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- products_viewed: array (nullable = true)
 |    |-- element: string (containsNull = true)

我已经研究了pivot,它很接近,但我不需要它的聚合部分,而是需要在基于event\u name column创建的列上创建数组。我们不知道怎么做。
注意:上面的“产品点击”和“产品查看”列中的顺序很重要,并且基于输入Dataframe的“时间戳”列。

yrdbyhpb

yrdbyhpb1#

你可以用 collect_list 在数据透视聚合期间:

import pyspark.sql.functions as F

df2 = (df.groupBy('user_id', 'results')
         .pivot('event_name')
         .agg(F.collect_list('product_id'))
         .selectExpr("user_id", "results", "Click as product_clicked", "View as product_viewed")
      )

df2.show()
+-------+-------+---------------+--------------+
|user_id|results|product_clicked|product_viewed|
+-------+-------+---------------+--------------+
| user_1|result2|            [3]|        [4, 5]|
| user_1|result1|            [1]|           [1]|
+-------+-------+---------------+--------------+

为了确保排序,您可以收集包含时间戳的结构列表,对列表进行排序,并将列表转换为仅保留产品标识:

df2 = (df.groupBy('user_id', 'results')
         .pivot('event_name')
         .agg(F.sort_array(F.collect_list(F.struct('timestamp', 'product_id'))))
         .selectExpr("user_id", "results", "transform(Click, x -> x.product_id) as product_clicked", "transform(View, x -> x.product_id) as product_viewed")
      )

df2.show()
+-------+-------+---------------+--------------+
|user_id|results|product_clicked|product_viewed|
+-------+-------+---------------+--------------+
| user_1|result2|            [3]|        [4, 5]|
| user_1|result1|            [1]|           [1]|
+-------+-------+---------------+--------------+

相关问题