在pysparkDataframe中使用write.partitionby时如何删除重复项?

disho6za  于 2021-05-24  发布在  Spark
关注(0)|答案(1)|浏览(377)

我有一个如下所示的Dataframe:

|------------|-----------|---------------|---------------|
|    Name    |   Type    |  Attribute 1  |  Attribute 2  |
|------------|-----------|---------------|---------------|
|   Roger    |     A     |     X         |       Y       |
|------------|-----------|---------------|---------------|
|   Roger    |     A     |     X         |       Y       |
|------------|-----------|---------------|---------------|
|   Roger    |     A     |     X         |       Y       |
|------------|-----------|---------------|---------------|
|   Rafael   |     A     |     G         |       H       |
|------------|-----------|---------------|---------------|
|   Rafael   |     A     |     G         |       H       |
|------------|-----------|---------------|---------------|
|   Rafael   |     B     |     G         |       H       |
|------------|-----------|---------------|---------------|

我想分区这个Dataframe并根据名称和类型将其保存到磁盘
代码行现在是这样的,

df.write.partitionBy("Name", "Type").mode("append").csv("output/", header=True)

输出得到正确保存,但是有重复的行,如下所述
在文件夹中
/输出/收到/a

|---------------|---------------|
|  Attribute 1  |  Attribute 2  |
|---------------|---------------|
|     X         |       Y       |
|---------------|---------------|
|     X         |       Y       |
|---------------|---------------|
|     X         |       Y       |
|---------------|---------------|

/输出/rafael/a

|---------------|---------------|
|  Attribute 1  |  Attribute 2  |
|---------------|---------------|
|     G         |       H       |
|---------------|---------------|
|     G         |       H       |
|---------------|---------------|

/输出/rafael/b

|---------------|---------------|
|  Attribute 1  |  Attribute 2  |
|---------------|---------------|
|     G         |       H       |
|---------------|---------------|

如您所见,此csv包含重复项。如何在使用write.partitionby时删除这些重复项?

flvtvl50

flvtvl501#

使用 .distinct() 在写作之前。

df.distinct().write.partitionBy("Name", "Type").mode("append").csv("output/", header=True)

相关问题