正如主题所描述的,我有一个PySpark Dataframe,我需要将三列合并为行。每一列本质上代表一个类别中的一个事实。最终目标是将数据聚合为每个类别的一个总数。
这个dataframe中有数千万行,所以我需要一种方法来在spark集群上进行转换,而不将任何数据带回驱动程序(在本例中为Jupyter)。
下面是我的数据框架的摘录,只是几个商店:+-----------+----------------+-----------------+----------------+ | store_id |qty_on_hand_milk|qty_on_hand_bread|qty_on_hand_eggs| +-----------+----------------+-----------------+----------------+ | 100| 30| 105| 35| | 200| 55| 85| 65| | 300| 20| 125| 90| +-----------+----------------+-----------------+----------------+
这里是期望的结果 Dataframe ,每个存储多行,其中原始 Dataframe 的列已经被融合到新 Dataframe 的行中,其中新类别列中的每个原始列一行:+-----------+--------+-----------+ | product_id|CATEGORY|qty_on_hand| +-----------+--------+-----------+ | 100| milk| 30| | 100| bread| 105| | 100| eggs| 35| | 200| milk| 55| | 200| bread| 85| | 200| eggs| 65| | 300| milk| 20| | 300| bread| 125| | 300| eggs| 90| +-----------+--------+-----------+
最后,我想聚合得到的dataframe以获得每个类别的总数:+--------+-----------------+ |CATEGORY|total_qty_on_hand| +--------+-----------------+ | milk| 105| | bread| 315| | eggs| 190| +--------+-----------------+
更新:有建议说这个问题是重复的,可以用here来回答。事实并非如此,因为解决方案将行转换为列,我需要做相反的操作,将列融化为行。
5条答案
按热度按时间ulmd4ohb1#
我们可以使用explode()函数来解决这个问题。在Python中,同样的事情可以用
melt
来完成。写下面的函数,它应该
explode
这个DataFrame:将此DataFrame上的函数应用于
explode
,则-现在,我们需要从
CATEGORY
列中删除字符串qty_on_hand_
。这可以使用expr()函数完成。注意expr
遵循基于1的子字符串索引,而不是0 -最后,使用agg()函数聚合按
CATEGORY
分组的列qty_on_hand
-gk7wooem2#
我认为你应该使用
array
和explode
来实现这一点,你不需要任何复杂的UDF逻辑或自定义函数。array
会将列合并为一列,或者对列进行注解。explode
将把一个数组列转换成一组行。你需要做的就是:
注意如何使用
col('foo')[INDEX]
提取数组列的元素;不需要将它们分成单独的列。这种方法在不同的数据类型上也是健壮的,因为它不试图在每一行上强制使用相同的模式(与使用结构体不同)。
例如,如果'qty_on_hand_bread'是一个字符串,这仍然有效,结果模式将只是:
下面是相同的代码,一步一步地使它明显发生了什么:
...以及每个步骤的输出:
ehxuflar3#
使用pyspark的-
col,when, functions
模块可以实现此目的jjjwad0x4#
迟来的答案。列表理解和内联函数可以融化df。
ryoqjall5#
下面是实现它的函数