我有这样一个 Dataframe :
column_1 column_2 column_3 column_4 column_5 column_6 column_7
34432 apple banana mango pine lemon j93jk84
98389 grape orange pine kiwi cherry j93jso3
94749 apple banana mango pine lemon ke03jfr
48948 apple banana mango pine lemon 9jef3f4
. . . . . . .
90493 pear apricot papaya plum lemon 93jd30d
90843 grape orange pine kiwi cherry 03nd920
我想要两个 Dataframe 。
Dataframe _1:
我希望忽略column_1和column_7,删除所有重复数据,并仅保留基于所有其他列的唯一行。
Dataframe _2:
column_1 column_2 column_3 column_4 column_5 column_6 column_7 type Tag
34432 apple banana mango pine lemon j93jk84 unique 1
98389 grape orange pine kiwi cherry j93jso3 unique 2
94749 apple banana mango pine lemon ke03jfr duplicated 1
48948 apple banana mango pine lemon 9jef3f4 duplicated 1
. . . . . . .
90493 pear apricot papaya plum lemon 93jd30d unique 3
90843 grape orange pine kiwi cherry 03nd920 duplicated 2
正如您在示例daraframe_2中所看到的,我需要两个新列的"type",它指定行是唯一的还是重复的."tag",以便轻松地识别哪个是唯一的行,哪个是属于该重复行的其他重复行
有人能告诉我,如何在pyspark中实现这两个 Dataframe 吗?
- 我尝试的代码:**
# to drop the duplicates ignoring column_1 and column_7
df_unique = df.dropDuplicates(["column_6","column_2","column_3","column_4","column_5"])
df_duplicates = df.subtract(df_unique)
# adding a type column to both dataframes and concatinating two dataframes
df_unique = df_unique.withColumn("type", F.lit("unique"))
df_duplicates = df_duplicated.withColumn("type", F.lit("duplicate"))
df_combined = df_unique.unionByName(df_duplicates )
# unable to create the tag column
..
2条答案
按热度按时间ubbxdtey1#
如果我没理解错你的问题,基本上你需要-
1.将第一行标记为唯一
1.如果除column_1和column_2外所有列的值都相同,则将所有后续行标记为重复
使用
row_number
:使用所有列作为分区键进行比较,并为每个分区生成行号,如果一组列值有更多行-它们将属于同一组,并相应地具有row_number。(如果需要,您可以使用orderBy
将特定行标记为唯一):我已经收集了所有列的值,以便在一个数组中进行比较。
输入:
输出:
wrrgggsh2#
我做了类似这样的事情,这里是输入:
对于第一个df,可以使用dropDuplicates,因为可以传递列的子集
对于第二个df,你可以这样做:
产出