pyspark:对来自输入文件的记录进行扁平化

hivapdat  于 2021-05-16  发布在  Spark
关注(0)|答案(2)|浏览(393)

我有如下输入csv文件-

plant_id,  system1_id, system2_id, system3_id
A1          s1-111      s2-111     s3-111
A2          s1-222      s2-222     s3-222
A3          s1-333      s2-333     s3-333

我想把下面的记录整理一下

plant_id    system_id     system_name   
A1          s1-111        system1
A1          s2-111        system2
A1          s3-111        system3
A2          s1-222        system1
A2          s2-222        system2
A2          s3-222        system3
A3          s1-333        system1
A3          s2-333        system2
A3          s3-333        system3

目前,我可以通过为每个系统列创建一个转置的pyspark df,然后在所有df的末尾执行union来实现它。但是它需要写一段很长的代码。有没有办法用几行代码来实现呢?

wfveoks0

wfveoks01#

1准备样本输入数据

from pyspark.sql import functions as F
sampleData = (('A1','s1-111','s2-111','s3-111'),
        ('A2','s1-222','s2-222','s3-222'),
        ('A3','s1-333','s2-222','s3-333')
        )

2创建输入数据列列表 columns = ['plant_id','system1_id','system2_id','system3_id'] 三。创建sparkDataframe

df = spark.createDataFrame(data=sampleData, schema=columns)
df.show()
+--------+----------+----------+----------+
|plant_id|system1_id|system2_id|system3_id|
+--------+----------+----------+----------+
|      A1|    s1-111|    s2-111|    s3-111|
|      A2|    s1-222|    s2-222|    s3-222|
|      A3|    s1-333|    s2-222|    s3-333|
+--------+----------+----------+----------+

4我们正在使用 stack() 函数将多列分隔为行。这是你的名字 stack 函数语法: stack(n, expr1, ..., exprk) -分隔 expr1 , ..., exprk 进入 n 排。

finalDF = df.select('plant_id',F.expr("stack(3,system1_id, 'system1_id', system2_id, 'system2_id', system3_id, 'system3_id') as (system_id, system_name)"))

finalDF.show()
+--------+---------+-----------+
|plant_id|system_id|system_name|
+--------+---------+-----------+
|      A1|   s1-111| system1_id|
|      A1|   s2-111| system2_id|
|      A1|   s3-111| system3_id|
|      A2|   s1-222| system1_id|
|      A2|   s2-222| system2_id|
|      A2|   s3-222| system3_id|
|      A3|   s1-333| system1_id|
|      A3|   s2-222| system2_id|
|      A3|   s3-333| system3_id|
+--------+---------+-----------+
yptwkmov

yptwkmov2#

使用 stack :

df2 = df.selectExpr(
    'plant_id',
    """stack(
         3,
         system1_id, 'system1_id', system2_id, 'system2_id', system3_id, 'system3_id')
         as (system_id, system_name)"""
)

df2.show()
+--------+---------+-----------+
|plant_id|system_id|system_name|
+--------+---------+-----------+
|      A1|   s1-111| system1_id|
|      A1|   s2-111| system2_id|
|      A1|   s3-111| system3_id|
|      A2|   s1-222| system1_id|
|      A2|   s2-222| system2_id|
|      A2|   s3-222| system3_id|
|      A3|   s1-333| system1_id|
|      A3|   s2-333| system2_id|
|      A3|   s3-333| system3_id|
+--------+---------+-----------+

相关问题