如何在spark中执行groupconcat?

syqv5f0l  于 2021-07-26  发布在  Java
关注(0)|答案(1)|浏览(410)

我使用的spark版本是2.4。我想有一个像下表这样的结果,有没有什么好办法呢?我一直在尝试使用group\u concat并将它们与分开,但是有没有最好的方法呢?
我们想结合df_1和df_2生成df_3。


**DF_1**

|key|value | 
|:----: |:------:|
| key_1  |test_1    |
| key_1  |test_2   |
| key_1  |test_3    |
| key_2  |test_1    |
| key_2  |test_2    |

**DF_2**

|key|value | 
|:----: |:------:|
| key_1  |value   |
| key_1  |value   |

**DF_3**

|key|value | test_1|test_2|test_3|
|:----: |:------:|:------:|:------:|:------:|
| key_1  |value    |test_1|test_2|test_3
| key_2  |value    |test_1|test_2|null
wn9m85ua

wn9m85ua1#

根据你的问题,这是我能想到的最接近的了-
这样做的目的是使 temp_df1 dataframe将行转置到各个列,然后 jointemp_df2 获取所需的 value 领域

import pyspark

from pyspark.sql import SQLContext
import pyspark.sql.functions as F

temp_df1 = spark.createDataFrame(
    [
        ("A","Test1"),
        ("A","Test2"),
        ("A","Test3"),
        ("B","Test1"),
        ("B","Test2"),
    ],
    ["key","value"]
)

temp_df1 = temp_df1.groupby("key").pivot("value").agg(F.first("value"))#.show()

temp_df2 = spark.createDataFrame(
    [
        ("A","Value1"),
        ("A","Value2"),
        ("B","Value1"),
        ("B","Value2"),
    ],
    ["key","value"]
)

temp_df3 = temp_df1.join(temp_df2,temp_df1['key'] == temp_df2['key'],'inner').select(
                                                  temp_df1['*']
                                                  ,temp_df2['value']
  )

temp_df3.show()

+---+-----+-----+-----+------+
|key|Test1|Test2|Test3| value|
+---+-----+-----+-----+------+
|  A|Test1|Test2|Test3|Value1|
|  A|Test1|Test2|Test3|Value2|
|  B|Test1|Test2| null|Value1|
|  B|Test1|Test2| null|Value2|
+---+-----+-----+-----+------+

相关问题