如何在Pyspark中以这种方式创建枢轴?

ztmd8pv5  于 2023-02-05  发布在  Apache
关注(0)|答案(3)|浏览(127)

我有一个pyspark Dataframe df:-
| 储存|列_苹果_BB|列_苹果_非BB|列_梨_BB|列_梨形_非BB|列_橙色_BB|颜色_橙色_非BB|列_葡萄_BB|颜色_葡萄色_非BB|
| - ------|- ------|- ------|- ------|- ------|- ------|- ------|- ------|- ------|
| 1个|二十八|二十四|二十四|三十二|二十六|五十四|六十|三十六|
| 第二章|十九|十二|二十四|十三|十个|二十四|二十九|十个|
我有另一个pyspark df df2:-
| 储存|光动力治疗|水果|类型|
| - ------|- ------|- ------|- ------|
| 1个|1个|苹果|BB型|
| 1个|第二章|橙色|非BB|
| 1个|三个|梨|BB型|
| 1个|四个|葡萄|BB型|
| 1个|五个|苹果|BB型|
| 1个|六个|橙色|BB型|
| 第二章|1个|梨|非BB|
| 第二章|第二章|橙色|非BB|
| 第二章|三个|苹果|非BB|
应为pyspark df2,其中列COL_VALUE用于存储、水果,类型:-
| 储存|光动力治疗|水果|类型|列_值|
| - ------|- ------|- ------|- ------|- ------|
| 1个|1个|苹果|BB型|二十八|
| 1个|第二章|橙色|非BB|五十四|
| 1个|三个|梨|BB型|二十四|
| 1个|四个|葡萄|BB型|六十|
| 1个|五个|苹果|BB型|二十八|
| 1个|六个|橙色|BB型|二十六|
| 第二章|1个|梨|非BB|十三|
| 第二章|第二章|橙色|非BB|二十四|
| 第二章|三个|苹果|非BB|十二|

cwdobuhd

cwdobuhd1#

from pyspark.sql.functions import *

df = spark.createDataFrame(
    [
        (1, 28, 24, 24, 32, 26, 54, 60, 36),
(2, 19, 12, 24, 13, 10, 24, 29, 10)
    ],
    ["STORE",   "COL_APPLE_BB", "COL_APPLE_NONBB",  "COL_PEAR_BB",  "COL_PEAR_NONBB",   "COL_ORANGE_BB",    "COL_ORANGE_NONBB", "COL_GRAPE_BB","COL_GRAPE_NONBB"]
)

df2 = spark.createDataFrame(
    [
        (1, 1,  "APPLE",    "BB"),
        (1, 2,  "ORANGE",   "NONBB"),
        (1, 3,  "PEAR", "BB"),
        (1, 4, "GRAPE", "BB"),
        (1, 5,  "APPLE",    "BB"),
        (1, 6,  "ORANGE",   "BB"),
        (2, 1,  "PEAR", "NONBB"),
        (2, 2, "ORANGE",    "NONBB"),
        (2, 3,  "APPLE",    "NONBB")
    ],
    ["STORE", "PDT", "FRUIT", "TYPE"]
)

unPivot_df = df.select("STORE",expr("stack(8, 'APPLE_BB',COL_APPLE_BB,\
                                                         'APPLE_NONBB',COL_APPLE_NONBB,\
                                                         'PEAR_BB', COL_PEAR_BB,\
                                                         'PEAR_NONBB', COL_PEAR_NONBB,\
                                                         'ORANGE_BB',COL_ORANGE_BB, \
                                                         'ORANGE_NONBB',COL_ORANGE_NONBB,\
                                                         'GRAPE_BB',COL_GRAPE_BB,\
                                                         'GRAPE_NONBB',COL_GRAPE_NONBB) as (Appended,COL_VALUE)"))  
    
df2 = df2.withColumn("Appended",concat_ws('_',col("FRUIT"),col("TYPE")))    
df2 = df2.join(unPivot_df,['STORE',"Appended"],"left")
df2.show()

+-----+------------+---+------+-----+---------+
|STORE|    Appended|PDT| FRUIT| TYPE|COL_VALUE|
+-----+------------+---+------+-----+---------+
|    1|ORANGE_NONBB|  2|ORANGE|NONBB|       54|
|    1|     PEAR_BB|  3|  PEAR|   BB|       24|
|    1|    GRAPE_BB|  4| GRAPE|   BB|       60|
|    1|    APPLE_BB|  1| APPLE|   BB|       28|
|    2|ORANGE_NONBB|  2|ORANGE|NONBB|       24|
|    2| APPLE_NONBB|  3| APPLE|NONBB|       12|
|    1|   ORANGE_BB|  6|ORANGE|   BB|       26|
|    1|    APPLE_BB|  5| APPLE|   BB|       28|
|    2|  PEAR_NONBB|  1|  PEAR|NONBB|       13|
+-----+------------+---+------+-----+---------+
9udxz4iz

9udxz4iz2#

如果你有Spark 3.2或更高版本,你可以使用类似的东西:

data = data.melt(
    id_vars=['STORE'],
    value_vars=data.columns[1:], 
    var_name="variable", 
    value_name="value"
)

以获取数据集的“长”格式,然后使用regex_extract两次以从variable列获取所需的信息。
对于早期版本的Spark,请使用以下命令:

def process_row(row):
    output = []
    for index, key in enumerate(row.asDict()):
        if key == "STORE":
            store = row[key]
        else:
            _, fruit, type = key.split("_")
            output.append((store, index, fruit, type, row[key]))
    return output

data = data.rdd.flatMap(process_row).toDF(
    schema=["STORE", "PDT", "FRUIT", "TYPE", "COLUMN_VALUE"]
)
n3h0vuf2

n3h0vuf23#

除了melt,您还可以在早期的Spark版本中使用stack

df = spark.createDataFrame(
    [
        (1, 28, 24),
        (2, 19, 12),
    ],
    ["STORE", "COL_APPLE_BB", "COL_APPLE_NONBB"]
)

df2 = spark.createDataFrame(
    [
        (1, 1, "APPLE", "BB"),
        (1, 2, "ORANGE", "NONBB"),
        (1, 2, "APPLE", "NONBB"),
        (2, 3, "APPLE", "NONBB")
    ],
    ["STORE", "PDT", "FRUIT", "TYPE"]
)

创建与df中的"COL_FRUIT_TYPE"匹配的列:

df3 = df2.withColumn("fruit_type", F.concat(F.lit("COL_"), F.col("FRUIT"), F.lit("_"), F.col("TYPE")))
df3.show(10, False)

给出:

+-----+---+------+-----+----------------+
|STORE|PDT|FRUIT |TYPE |fruit_type      |
+-----+---+------+-----+----------------+
|1    |1  |APPLE |BB   |COL_APPLE_BB    |
|1    |2  |ORANGE|NONBB|COL_ORANGE_NONBB|
|1    |2  |APPLE |NONBB|COL_APPLE_NONBB |
+-----+---+------+-----+----------------+

然后"取消旋转"第一个df

from pyspark.sql.functions import expr

unpivotExpr = "stack({}, {}) as (fruit_type, COL_VALUE)".format(len(df.columns) - 1, ','.join(  [("'{}', {}".format(c, str(c))) for c in df.columns[1:]] ) )
print(unpivotExpr)
unPivotDF = df.select("STORE", expr(unpivotExpr)) \
              .where("STORE is not null")

unPivotDF.show(truncate=False)

stack函数的参数为:它将被"解透视"的"列"的数目(这里,它导出它将是len(df.columns) - 1,因为我们将跳过STORE列);然后,如果只有column,value对,则它以col_name,value的形式获取这些对的列表。在这里,[("'{}', {}".format(c, str(c))) for c in df.columns[1:]]部分从df中获取列,跳过第一列(STORE),然后为其余的每一列返回一个对,例如"COL_APPLE_BB",COL_APPLE_BB。最后,我将它们连接成一个逗号分隔的字符串(",".join()),并将占位符{}替换为该字符串。例如,stack函数通常是如何调用的:"stack(2, 'COL_APPLE_BB', COL_APPLE_BB, 'COL_APPLE_NONBB', COL_APPLE_NONBB) as (fruit_type, COL_VALUE)"
unPivotDF.show(truncate=False)输出:

+-----+---------------+---------+
|STORE|fruit_type     |COL_VALUE|
+-----+---------------+---------+
|1    |COL_APPLE_BB   |28       |
|1    |COL_APPLE_NONBB|24       |
|2    |COL_APPLE_BB   |19       |
|2    |COL_APPLE_NONBB|12       |
+-----+---------------+---------+

并将两者结合起来:

df3.join(unPivotDF, ["fruit_type", "STORE"], "left")\
   .select("STORE", "PDT", "FRUIT", "TYPE", "COL_VALUE").show(40, False)

结果:

+-----+---+------+-----+---------+
|STORE|PDT|FRUIT |TYPE |COL_VALUE|
+-----+---+------+-----+---------+
|1    |2  |ORANGE|NONBB|null     |
|1    |2  |APPLE |NONBB|24       |
|1    |1  |APPLE |BB   |28       |
|2    |3  |APPLE |NONBB|12       |
+-----+---+------+-----+---------+

缺点是您需要枚举stack中的列名,如果我找到了自动完成此操作的方法,我将更新答案。
编辑:我更新了stack函数的用法,这样它就可以自己派生列了。

相关问题