PySpark Dataframe将列融化成行

voj3qocg  于 2023-03-28  发布在  Spark
关注(0)|答案(5)|浏览(124)

正如主题所描述的,我有一个PySpark Dataframe,我需要将三列合并为行。每一列本质上代表一个类别中的一个事实。最终目标是将数据聚合为每个类别的一个总数。
这个dataframe中有数千万行,所以我需要一种方法来在spark集群上进行转换,而不将任何数据带回驱动程序(在本例中为Jupyter)。
下面是我的数据框架的摘录,只是几个商店:+-----------+----------------+-----------------+----------------+ | store_id |qty_on_hand_milk|qty_on_hand_bread|qty_on_hand_eggs| +-----------+----------------+-----------------+----------------+ | 100| 30| 105| 35| | 200| 55| 85| 65| | 300| 20| 125| 90| +-----------+----------------+-----------------+----------------+
这里是期望的结果 Dataframe ,每个存储多行,其中原始 Dataframe 的列已经被融合到新 Dataframe 的行中,其中新类别列中的每个原始列一行:+-----------+--------+-----------+ | product_id|CATEGORY|qty_on_hand| +-----------+--------+-----------+ | 100| milk| 30| | 100| bread| 105| | 100| eggs| 35| | 200| milk| 55| | 200| bread| 85| | 200| eggs| 65| | 300| milk| 20| | 300| bread| 125| | 300| eggs| 90| +-----------+--------+-----------+
最后,我想聚合得到的dataframe以获得每个类别的总数:+--------+-----------------+ |CATEGORY|total_qty_on_hand| +--------+-----------------+ | milk| 105| | bread| 315| | eggs| 190| +--------+-----------------+
更新:有建议说这个问题是重复的,可以用here来回答。事实并非如此,因为解决方案将行转换为列,我需要做相反的操作,将列融化为行。

ulmd4ohb

ulmd4ohb1#

我们可以使用explode()函数来解决这个问题。在Python中,同样的事情可以用melt来完成。

# Loading the requisite packages 
from pyspark.sql.functions import col, explode, array, struct, expr, sum, lit        
# Creating the DataFrame
df = sqlContext.createDataFrame([(100,30,105,35),(200,55,85,65),(300,20,125,90)],('store_id','qty_on_hand_milk','qty_on_hand_bread','qty_on_hand_eggs'))
df.show()
+--------+----------------+-----------------+----------------+
|store_id|qty_on_hand_milk|qty_on_hand_bread|qty_on_hand_eggs|
+--------+----------------+-----------------+----------------+
|     100|              30|              105|              35|
|     200|              55|               85|              65|
|     300|              20|              125|              90|
+--------+----------------+-----------------+----------------+

写下面的函数,它应该explode这个DataFrame:

def to_explode(df, by):

    # Filter dtypes and split into column names and type description
    cols, dtypes = zip(*((c, t) for (c, t) in df.dtypes if c not in by))
    # Spark SQL supports only homogeneous columns
    assert len(set(dtypes)) == 1, "All columns have to be of the same type"

    # Create and explode an array of (column_name, column_value) structs
    kvs = explode(array([
      struct(lit(c).alias("CATEGORY"), col(c).alias("qty_on_hand")) for c in cols
    ])).alias("kvs")

    return df.select(by + [kvs]).select(by + ["kvs.CATEGORY", "kvs.qty_on_hand"])

将此DataFrame上的函数应用于explode,则-

df = to_explode(df, ['store_id'])\
     .drop('store_id')
df.show()
+-----------------+-----------+
|         CATEGORY|qty_on_hand|
+-----------------+-----------+
| qty_on_hand_milk|         30|
|qty_on_hand_bread|        105|
| qty_on_hand_eggs|         35|
| qty_on_hand_milk|         55|
|qty_on_hand_bread|         85|
| qty_on_hand_eggs|         65|
| qty_on_hand_milk|         20|
|qty_on_hand_bread|        125|
| qty_on_hand_eggs|         90|
+-----------------+-----------+

现在,我们需要从CATEGORY列中删除字符串qty_on_hand_。这可以使用expr()函数完成。注意expr遵循基于1的子字符串索引,而不是0 -

df = df.withColumn('CATEGORY',expr('substring(CATEGORY, 13)'))
df.show()
+--------+-----------+
|CATEGORY|qty_on_hand|
+--------+-----------+
|    milk|         30|
|   bread|        105|
|    eggs|         35|
|    milk|         55|
|   bread|         85|
|    eggs|         65|
|    milk|         20|
|   bread|        125|
|    eggs|         90|
+--------+-----------+

最后,使用agg()函数聚合按CATEGORY分组的列qty_on_hand-

df = df.groupBy(['CATEGORY']).agg(sum('qty_on_hand').alias('total_qty_on_hand'))
df.show()
+--------+-----------------+
|CATEGORY|total_qty_on_hand|
+--------+-----------------+
|    eggs|              190|
|   bread|              315|
|    milk|              105|
+--------+-----------------+
gk7wooem

gk7wooem2#

我认为你应该使用arrayexplode来实现这一点,你不需要任何复杂的UDF逻辑或自定义函数。
array会将列合并为一列,或者对列进行注解。
explode将把一个数组列转换成一组行。
你需要做的就是:

  • 用您的自定义标签注解每一列(例如“milk”)
  • 把你的标签列合并成一个数组类型的列
  • 分解标签列以生成带标签的行
  • 删除不相关的列
df = (
    df.withColumn('labels', F.explode(                         # <-- Split into rows
        F.array(                                               # <-- Combine columns
            F.array(F.lit('milk'), F.col('qty_on_hand_milk')), # <-- Annotate column
            F.array(F.lit('bread'), F.col('qty_on_hand_bread')),
            F.array(F.lit('eggs'), F.col('qty_on_hand_eggs')),
        )
    ))
    .withColumn('CATEGORY', F.col('labels')[0])
    .withColumn('qty_on_hand', F.col('labels')[1])
).select('store_id', 'CATEGORY', 'qty_on_hand')

注意如何使用col('foo')[INDEX]提取数组列的元素;不需要将它们分成单独的列。
这种方法在不同的数据类型上也是健壮的,因为它不试图在每一行上强制使用相同的模式(与使用结构体不同)。
例如,如果'qty_on_hand_bread'是一个字符串,这仍然有效,结果模式将只是:

root
 |-- store_id: long (nullable = false)
 |-- CATEGORY: string (nullable = true)
 |-- qty_on_hand: string (nullable = true) <-- Picks best schema on the fly

下面是相同的代码,一步一步地使它明显发生了什么:

import databricks.koalas as ks
import pyspark.sql.functions as F

# You don't need koalas, it's just less verbose for adhoc dataframes
df = ks.DataFrame({
    "store_id": [100, 200, 300],
    "qty_on_hand_milk": [30, 55, 20],
    "qty_on_hand_bread": [105, 85, 125],
    "qty_on_hand_eggs": [35, 65, 90],
}).to_spark()
df.show()

# Annotate each column with your custom label per row. ie. v -> ['label', v]
df = df.withColumn('label1', F.array(F.lit('milk'), F.col('qty_on_hand_milk')))
df = df.withColumn('label2', F.array(F.lit('bread'), F.col('qty_on_hand_bread')))
df = df.withColumn('label3', F.array(F.lit('eggs'), F.col('qty_on_hand_eggs')))
df.show()

# Create a new column which combines the labeled values in a single column
df = df.withColumn('labels', F.array('label1', 'label2', 'label3'))
df.show()

# Split into individual rows
df = df.withColumn('labels', F.explode('labels'))
df.show()

# You can now do whatever you want with your labelled rows, eg. split them into new columns
df = df.withColumn('CATEGORY', F.col('labels')[0])
df = df.withColumn('qty_on_hand', F.col('labels')[1])
df.show()

...以及每个步骤的输出:

|store_id|qty_on_hand_milk|qty_on_hand_bread|qty_on_hand_eggs|
+--------+----------------+-----------------+----------------+
|     100|              30|              105|              35|
|     200|              55|               85|              65|
|     300|              20|              125|              90|
+--------+----------------+-----------------+----------------+

+--------+----------------+-----------------+----------------+----------+------------+----------+
|store_id|qty_on_hand_milk|qty_on_hand_bread|qty_on_hand_eggs|    label1|      label2|    label3|
+--------+----------------+-----------------+----------------+----------+------------+----------+
|     100|              30|              105|              35|[milk, 30]|[bread, 105]|[eggs, 35]|
|     200|              55|               85|              65|[milk, 55]| [bread, 85]|[eggs, 65]|
|     300|              20|              125|              90|[milk, 20]|[bread, 125]|[eggs, 90]|
+--------+----------------+-----------------+----------------+----------+------------+----------+

+--------+----------------+-----------------+----------------+----------+------------+----------+--------------------+
|store_id|qty_on_hand_milk|qty_on_hand_bread|qty_on_hand_eggs|    label1|      label2|    label3|              labels|
+--------+----------------+-----------------+----------------+----------+------------+----------+--------------------+
|     100|              30|              105|              35|[milk, 30]|[bread, 105]|[eggs, 35]|[[milk, 30], [bre...|
|     200|              55|               85|              65|[milk, 55]| [bread, 85]|[eggs, 65]|[[milk, 55], [bre...|
|     300|              20|              125|              90|[milk, 20]|[bread, 125]|[eggs, 90]|[[milk, 20], [bre...|
+--------+----------------+-----------------+----------------+----------+------------+----------+--------------------+

+--------+----------------+-----------------+----------------+----------+------------+----------+------------+
|store_id|qty_on_hand_milk|qty_on_hand_bread|qty_on_hand_eggs|    label1|      label2|    label3|      labels|
+--------+----------------+-----------------+----------------+----------+------------+----------+------------+
|     100|              30|              105|              35|[milk, 30]|[bread, 105]|[eggs, 35]|  [milk, 30]|
|     100|              30|              105|              35|[milk, 30]|[bread, 105]|[eggs, 35]|[bread, 105]|
|     100|              30|              105|              35|[milk, 30]|[bread, 105]|[eggs, 35]|  [eggs, 35]|
|     200|              55|               85|              65|[milk, 55]| [bread, 85]|[eggs, 65]|  [milk, 55]|
|     200|              55|               85|              65|[milk, 55]| [bread, 85]|[eggs, 65]| [bread, 85]|
|     200|              55|               85|              65|[milk, 55]| [bread, 85]|[eggs, 65]|  [eggs, 65]|
|     300|              20|              125|              90|[milk, 20]|[bread, 125]|[eggs, 90]|  [milk, 20]|
|     300|              20|              125|              90|[milk, 20]|[bread, 125]|[eggs, 90]|[bread, 125]|
|     300|              20|              125|              90|[milk, 20]|[bread, 125]|[eggs, 90]|  [eggs, 90]|
+--------+----------------+-----------------+----------------+----------+------------+----------+------------+

+--------+----------------+-----------------+----------------+----------+------------+----------+------------+--------+-----------+
|store_id|qty_on_hand_milk|qty_on_hand_bread|qty_on_hand_eggs|    label1|      label2|    label3|      labels|CATEGORY|qty_on_hand|
+--------+----------------+-----------------+----------------+----------+------------+----------+------------+--------+-----------+
|     100|              30|              105|              35|[milk, 30]|[bread, 105]|[eggs, 35]|  [milk, 30]|    milk|         30|
|     100|              30|              105|              35|[milk, 30]|[bread, 105]|[eggs, 35]|[bread, 105]|   bread|        105|
|     100|              30|              105|              35|[milk, 30]|[bread, 105]|[eggs, 35]|  [eggs, 35]|    eggs|         35|
|     200|              55|               85|              65|[milk, 55]| [bread, 85]|[eggs, 65]|  [milk, 55]|    milk|         55|
|     200|              55|               85|              65|[milk, 55]| [bread, 85]|[eggs, 65]| [bread, 85]|   bread|         85|
|     200|              55|               85|              65|[milk, 55]| [bread, 85]|[eggs, 65]|  [eggs, 65]|    eggs|         65|
|     300|              20|              125|              90|[milk, 20]|[bread, 125]|[eggs, 90]|  [milk, 20]|    milk|         20|
|     300|              20|              125|              90|[milk, 20]|[bread, 125]|[eggs, 90]|[bread, 125]|   bread|        125|
|     300|              20|              125|              90|[milk, 20]|[bread, 125]|[eggs, 90]|  [eggs, 90]|    eggs|         90|
+--------+----------------+-----------------+----------------+----------+------------+----------+------------+--------+-----------+

+--------+--------+-----------+
|store_id|CATEGORY|qty_on_hand|
+--------+--------+-----------+
|     100|    milk|         30|
|     100|   bread|        105|
|     100|    eggs|         35|
|     200|    milk|         55|
|     200|   bread|         85|
|     200|    eggs|         65|
|     300|    milk|         20|
|     300|   bread|        125|
|     300|    eggs|         90|
+--------+--------+-----------+
ehxuflar

ehxuflar3#

使用pyspark的-col,when, functions模块可以实现此目的

>>> from pyspark.sql import functions as F
>>> from pyspark.sql.functions import *
>>> from pyspark.sql.types import StringType
>>> concat_udf = F.udf(lambda cols: "".join([str(x) if x is not None else "*" for x in cols]), StringType())

>>> rdd = sc.parallelize([[100,30,105,35],[200,55,85,65],[300,20,125,90]])
>>> df = rdd.toDF(['store_id','qty_on_hand_milk','qty_on_hand_bread','qty_on_hand_eggs'])

>>> df.show()
+--------+----------------+-----------------+----------------+
|store_id|qty_on_hand_milk|qty_on_hand_bread|qty_on_hand_eggs|
+--------+----------------+-----------------+----------------+
|     100|              30|              105|              35|
|     200|              55|               85|              65|
|     300|              20|              125|              90|
+--------+----------------+-----------------+----------------+

#adding one more column with arrayed values of all three columns
>>> df_1=df.withColumn("new_col", concat_udf(F.array("qty_on_hand_milk", "qty_on_hand_bread","qty_on_hand_eggs")))
#convert it into array<int> for carrying out agg operations
>>> df_2=df_1.withColumn("new_col_1",split(col("new_col"), ",\s*").cast("array<int>").alias("new_col_1"))
#posexplode gives you the position along with usual explode which helps in categorizing
>>> df_3=df_2.select("store_id",  posexplode("new_col_1").alias("col_1","qty"))
#if else conditioning for category column
>>> df_3.withColumn("category",F.when(col("col_1") == 0, "milk").when(col("col_1") == 1, "bread").otherwise("eggs")).select("store_id","category","qty").show()
+--------+--------+---+
|store_id|category|qty|
+--------+--------+---+
|     100|    milk| 30|
|     100|   bread|105|
|     100|    eggs| 35|
|     200|    milk| 55|
|     200|   bread| 85|
|     200|    eggs| 65|
|     300|    milk| 20|
|     300|   bread|125|
|     300|    eggs| 90|
+--------+--------+---+

#aggregating to find sum
>>> df_3.withColumn("category",F.when(col("col_1") == 0, "milk").when(col("col_1") == 1, "bread").otherwise("eggs")).select("category","qty").groupBy('category').sum().show()
+--------+--------+
|category|sum(qty)|
+--------+--------+
|    eggs|     190|
|   bread|     315|
|    milk|     105|
+--------+--------+
>>> df_3.printSchema()
root
 |-- store_id: long (nullable = true)
 |-- col_1: integer (nullable = false)
 |-- qty: integer (nullable = true)
jjjwad0x

jjjwad0x4#

迟来的答案。列表理解和内联函数可以融化df。

(#Create struct of the column names and col values
  df.withColumn('tab',F.array(*[F.struct(F.lit(k.replace('qty_on_hand_','')).alias('CATEGORY'), F.col(k).alias('qty_on_hand')) for k in df.columns if k!='store_id']))
 #Explode using the inline function
  .selectExpr('store_id as product_id',"inline(tab)")
  #groupby and sum
  .groupby('CATEGORY').agg(sum('qty_on_hand').alias('total_qty_on_hand'))
).show()

+--------+-----------------+
|CATEGORY|total_qty_on_hand|
+--------+-----------------+
|    eggs|              190|
|   bread|              315|
|    milk|              105|
+--------+-----------------+
ryoqjall

ryoqjall5#

下面是实现它的函数

from pyspark.sql import functions as F

def melt(df,cols,alias=('key','value')):
  other = [col for col in df.columns if col not in cols]
  for c in cols:
    df = df.withColumn(c, F.expr(f'map("{c}", cast({c} as double))'))
  df = df.withColumn('melted_cols', F.map_concat(*cols))
  return df.select(*other,F.explode('melted_cols').alias(*alias))

相关问题