Apache Spark 创建包含重复行和非重复行的 Dataframe

s5a0g9ez  于 2023-02-13  发布在  Apache
关注(0)|答案(2)|浏览(148)

我有这样一个 Dataframe :

column_1   column_2   column_3   column_4  column_5  column_6  column_7
 34432      apple      banana     mango     pine     lemon     j93jk84
 98389      grape      orange     pine      kiwi     cherry    j93jso3
 94749      apple      banana     mango     pine     lemon    ke03jfr
 48948      apple      banana     mango     pine     lemon     9jef3f4
  .         .          .          .         .        .         .       
 90493      pear       apricot    papaya    plum     lemon     93jd30d
 90843      grape      orange     pine      kiwi     cherry    03nd920

我想要两个 Dataframe 。
Dataframe _1:
我希望忽略column_1和column_7,删除所有重复数据,并仅保留基于所有其他列的唯一行。
Dataframe _2:

column_1   column_2   column_3   column_4  column_5  column_6  column_7  type          Tag
 34432      apple      banana     mango     pine     lemon     j93jk84   unique         1
 98389      grape      orange     pine      kiwi     cherry    j93jso3   unique         2
 94749      apple      banana     mango     pine     lemon    ke03jfr   duplicated     1
 48948      apple      banana     mango     pine     lemon     9jef3f4   duplicated     1
  .         .          .          .         .        .         .       
 90493      pear       apricot    papaya    plum     lemon     93jd30d   unique         3
 90843      grape      orange     pine      kiwi     cherry    03nd920   duplicated     2

正如您在示例daraframe_2中所看到的,我需要两个新列的"type",它指定行是唯一的还是重复的."tag",以便轻松地识别哪个是唯一的行,哪个是属于该重复行的其他重复行
有人能告诉我,如何在pyspark中实现这两个 Dataframe 吗?

    • 我尝试的代码:**
# to drop the duplicates ignoring column_1 and column_7
df_unique = df.dropDuplicates(["column_6","column_2","column_3","column_4","column_5"])

df_duplicates = df.subtract(df_unique)
# adding a type column to both dataframes and concatinating two dataframes

df_unique = df_unique.withColumn("type", F.lit("unique"))
df_duplicates = df_duplicated.withColumn("type", F.lit("duplicate"))
df_combined = df_unique.unionByName(df_duplicates )

# unable to create the tag column
..
ubbxdtey

ubbxdtey1#

如果我没理解错你的问题,基本上你需要-
1.将第一行标记为唯一
1.如果除column_1和column_2外所有列的值都相同,则将所有后续行标记为重复

    • 如果不是,请告诉我**

使用row_number:使用所有列作为分区键进行比较,并为每个分区生成行号,如果一组列值有更多行-它们将属于同一组,并相应地具有row_number。(如果需要,您可以使用orderBy将特定行标记为唯一):

df.withColumn("asArray", F.array(*[x for x in df.schema.names if x!="column_1" and x!="column_7"]))\
.withColumn("rn", F.row_number().over(Window.partitionBy("asArray").orderBy(F.lit("dummy"))))\
.withColumn("type", F.when(F.col("rn")==1, "Unique").otherwise("Duplicated"))\
.withColumn("tag", F.dense_rank().over(Window.orderBy(F.col("asArray"))))\
.show(truncate=False)

我已经收集了所有列的值,以便在一个数组中进行比较。

    • 编辑-输出与您的数据集相似、重复项超过2个的数据。此外,还更正了标记逻辑**

输入:

输出:

wrrgggsh

wrrgggsh2#

我做了类似这样的事情,这里是输入:

import pyspark.sql.functions as F
from pyspark.sql import Window

data = [
    {"column_1": 34432, "column_2": "apple", "column_3": "banana", "column_4": "mango", "column_5": "pine", "column_6": "lemon", "column_7": "j93jk84"},
    {"column_1": 98389, "column_2": "grape", "column_3": "orange", "column_4": "pine", "column_5": "kiwi", "column_6": "cherry", "column_7": "j93jso3"},
    {"column_1": 94749, "column_2": "apple", "column_3": "banana", "column_4": "mango", "column_5": "pine", "column_6": "lemon", "column_7": "ke03jfr"},
    {"column_1": 48948, "column_2": "grape", "column_3": "banana", "column_4": "mango", "column_5": "pine", "column_6": "lemon", "column_7": "9jef3f4"},
    {"column_1": 90493, "column_2": "pear", "column_3": "apricot", "column_4": "papaya", "column_5": "plum", "column_6": "lemon", "column_7": "93jd30d"},
    {"column_1": 90843, "column_2": "grape", "column_3": "orange", "column_4": "pine", "column_5": "kiwi", "column_6": "cherry", "column_7": "03nd920"}
]

df = spark.createDataFrame(data)

对于第一个df,可以使用dropDuplicates,因为可以传递列的子集

firstDf = df.dropDuplicates(["column_2","column_3","column_4","column_5","column_6"])

对于第二个df,你可以这样做:

windowSpec = Window.partitionBy("column_2","column_3","column_4","column_5","column_6").orderBy("column_1")

secondDf = (
    df.withColumn("row_number", F.row_number().over(windowSpec))
    .withColumn(
        "type",
        F.when(F.col("row_number") == F.lit(1), F.lit("unique")).otherwise(
            F.lit("duplicated")
        ),
    )
    .withColumn("tag", F.first("column_1").over(windowSpec))
    .withColumn("tag", F.dense_rank().over(Window.partitionBy().orderBy("tag")))
).drop("row_number").show()

产出

+--------+--------+--------+--------+--------+--------+--------+----------+---+
|column_1|column_2|column_3|column_4|column_5|column_6|column_7|      type|tag|
+--------+--------+--------+--------+--------+--------+--------+----------+---+
|   34432|   apple|  banana|   mango|    pine|   lemon| j93jk84|    unique|  1|
|   94749|   apple|  banana|   mango|    pine|   lemon| ke03jfr|duplicated|  1|
|   48948|   grape|  banana|   mango|    pine|   lemon| 9jef3f4|    unique|  2|
|   90493|    pear| apricot|  papaya|    plum|   lemon| 93jd30d|    unique|  3|
|   90843|   grape|  orange|    pine|    kiwi|  cherry| 03nd920|    unique|  4|
|   98389|   grape|  orange|    pine|    kiwi|  cherry| j93jso3|duplicated|  4|
+--------+--------+--------+--------+--------+--------+--------+----------+---+

相关问题