pyspark将行转换为列

ojsjcaue  于 2021-07-12  发布在  Spark
关注(0)|答案(3)|浏览(363)

我有一个Dataframe,需要将同一组的行转换为列。基本上就是这些。下面是我的df。

+------------+-------+-----+-------+
|Customer    |ID     |unit |order  |
+------------+-------+-----+-------+
|John        |123    |00015|1      |
|John        |123    |00016|2      |
|John        |345    |00205|3      |
|John        |345    |00206|4      |
|John        |789    |00283|5      |
|John        |789    |00284|6      |
+------------+-------+-----+-------+

我需要的结果数据为上述。。

+--------+-------+--------+----------+--------+--------+-----------+--------+-------+----------+
|state   | ID_1  | unit_1 |seq_num_1 | ID_2   | unit_2 | seq_num_2 | ID_3   |unit_3 |seq_num_3 |
+--------+-------+--------+----------+--------+--------+-----------+--------+-------+----------+
|John    | 123   | 00015  | 1        |  345   | 00205  | 3         |  789   |00283  | 5        |
|John    | 123   | 00016  | 2        |  345   | 00206  | 4         |  789   |00284  | 6        |
+--------+-------+--------+----------+--------+--------+-----------+--------+-------+----------+

我尝试使用groupby和pivot()函数,但它的抛出错误表明找到了较大的pivot值。有没有不使用pivot()函数就能得到结果的方法..非常感谢您的帮助。谢谢。

vd2z7a6w

vd2z7a6w1#

这看起来像是一个典型的使用dense_rank()聚合函数创建泛型序列的例子( dr 在下面的代码)中,在每个客户组下使用不同的ID,然后按此顺序旋转。我们可以做类似的事情 order 列,以便可以在groupby中使用:

from pyspark.sql import Window, functions as F

# below I added an extra row for a reference when the number of rows vary for different IDs

df = spark.createDataFrame([
    ('John', '123', '00015', '1'), ('John', '123', '00016', '2'), ('John', '345', '00205', '3'),
    ('John', '345', '00206', '4'), ('John', '789', '00283', '5'), ('John', '789', '00284', '6'),
    ('John', '789', '00285', '7')
], ['Customer', 'ID', 'unit', 'order'])

添加两个窗规格: w1 在客户和 w2 获取同一客户和同一id下订单的行号()。

w1 = Window.partitionBy('Customer').orderBy('ID')
w2 = Window.partitionBy('Customer','ID').orderBy('order')

基于上述两个WinSpec添加两个新列: dr (密级)和 sid (行号)

df1 = df.select(
    "*", 
    F.dense_rank().over(w1).alias('dr'), 
    F.row_number().over(w2).alias('sid')
)
+--------+---+-----+-----+---+---+
|Customer| ID| unit|order| dr|sid|
+--------+---+-----+-----+---+---+
|    John|123|00015|    1|  1|  1|
|    John|123|00016|    2|  1|  2|
|    John|345|00205|    3|  2|  1|
|    John|345|00206|    4|  2|  2|
|    John|789|00283|    5|  3|  1|
|    John|789|00284|    6|  3|  2|
|    John|789|00285|    7|  3|  3|
+--------+---+-----+-----+---+---+

找到 max(dr) ,这样我们就可以预先定义要以其为中心的列表 range(1,N+1) (这将提高 pivot 方法)。

N = df1.agg(F.max('dr')).first()[0]

groupby公司 Customer , sid 以…为轴心 dr 然后进行聚合:

df_new = df1.groupby('Customer','sid') \
    .pivot('dr', range(1,N+1)) \
    .agg(
        F.first('ID').alias('ID'),
        F.first('unit').alias('unit'),
        F.first('order').alias('order')
)

df_new.show()
+--------+---+----+------+-------+----+------+-------+----+------+-------+
|Customer|sid|1_ID|1_unit|1_order|2_ID|2_unit|2_order|3_ID|3_unit|3_order|
+--------+---+----+------+-------+----+------+-------+----+------+-------+
|    John|  1| 123| 00015|      1| 345| 00205|      3| 789| 00283|      5|
|    John|  2| 123| 00016|      2| 345| 00206|      4| 789| 00284|      6|
|    John|  3|null|  null|   null|null|  null|   null| 789| 00285|      7|
+--------+---+----+------+-------+----+------+-------+----+------+-------+

如果需要,请重命名列名:

import re
df_new.toDF(*['_'.join(reversed(re.split('_',c,1))) for c in df_new.columns]).show()
+--------+---+----+------+-------+----+------+-------+----+------+-------+
|Customer|sid|ID_1|unit_1|order_1|ID_2|unit_2|order_2|ID_3|unit_3|order_3|
+--------+---+----+------+-------+----+------+-------+----+------+-------+
|    John|  1| 123| 00015|      1| 345| 00205|      3| 789| 00283|      5|
|    John|  2| 123| 00016|      2| 345| 00206|      4| 789| 00284|      6|
|    John|  3|null|  null|   null|null|  null|   null| 789| 00285|      7|
+--------+---+----+------+-------+----+------+-------+----+------+-------+
jpfvwuh4

jpfvwuh42#

下面是我的解决方案。。做排名然后把结果展平。

df = spark.createDataFrame([
    ('John', '123', '00015', '1'), ('John', '123', '00016', '2'), ('John', '345', '00205', '3'),
    ('John', '345', '00206', '4'), ('John', '789', '00283', '5'), ('John', '789', '00284', '6'),
    ('John', '789', '00285', '7')
], ['Customer', 'ID', 'unit', 'order'])

rankedDF = df.withColumn("rank", row_number().over(Window.partitionBy("customer").orderBy("order")))
w1 = Window.partitionBy("customer").orderBy("order")
groupedDF = rankedDF.select("customer", "rank", collect_list("ID").over(w1).alias("ID"), collect_list("unit").over(w1).alias("unit"), collect_list("order").over(w1).alias("seq_num")).groupBy("customer", "rank").agg(max("ID").alias("ID"), max("unit").alias("unit"), max("seq_num").alias("seq_num") )    
groupedColumns = [col("customer")]
pivotColumns = map(lambda i:map(lambda a:col(a)[i-1].alias(a + "_" + `i`), ["ID", "unit", "seq_num"]), [1,2,3])
flattenedCols = [item for sublist in pivotColumns for item in sublist]
finalDf=groupedDF.select(groupedColumns + flattenedCols)
aelbi1ox

aelbi1ox3#

可能有多种方法可以做到这一点,但Pandas自定义项可以是这样一种方式。以下是一个基于您的数据的玩具示例:

df = pd.DataFrame({'Customer': ['John']*6, 
                   'ID': [123]*2 + [345]*2 + [789]*2, 
                   'unit': ['00015', '00016', '00205', '00206', '00283', '00284'], 
                   'order': range(1, 7)})
sdf = spark.createDataFrame(df)

# Spark 2.4 syntax. Spark 3.0 is less verbose

return_types = 'state string, ID_1 int, unit_1 string, seq_num_1 int, ID_2int, unit_2 string, seq_num_2 int, ID_3 int, unit_3 string, seq_num_3 int'
@pandas_udf(returnType=return_types, functionType=PandasUDFType.GROUPED_MAP)
def convert_to_wide(pdf):
    groups = pdf.groupby('ID')
    out = pd.concat([group.set_index('Customer') for _, group in groups], axis=1).reset_index()
    out.columns = ['state', 'ID_1', 'unit_1', 'seq_num_1', 'ID_2', 'unit_2', 'seq_num_2', 'ID_3', 'unit_3', 'seq_num_3']
    return out

sdf.groupby('Customer').apply(convert_to_wide).show()

+-----+----+------+---------+----+------+---------+----+------+---------+
|state|ID_1|unit_1|seq_num_1|ID_2|unit_2|seq_num_2|ID_3|unit_3|seq_num_3|
+-----+----+------+---------+----+------+---------+----+------+---------+
| John| 123| 00015|        1| 345| 00205|        3| 789| 00283|        5|
| John| 123| 00016|        2| 345| 00206|        4| 789| 00284|        6|
+-----+----+------+---------+----+------+---------+----+------+---------+

相关问题