如何使用pyspark从pyspark dataframe中的数百万行数据中删除重复数据

k5ifujac  于 2023-10-15  发布在  Spark
关注(0)|答案(1)|浏览(114)

下面的代码运行良好,我得到的输出是:Dataframe image
我的示例模式示例:Sample Schema

from pyspark.sql.types import *
from pyspark.sql.functions import *

#Flatten array of structs and structs
def flatten(df):
   # compute Complex Fields (Lists and Structs) in Schema   
   complex_fields = dict([(field.name, field.dataType)
                             for field in df.schema.fields
                             if type(field.dataType) == ArrayType or  type(field.dataType) == StructType])
   while len(complex_fields)!=0:
      col_name=list(complex_fields.keys())[0]
    
      # if StructType then convert all sub element to columns.
      # i.e. flatten structs
      if (type(complex_fields[col_name]) == StructType):
         expanded = [col(col_name+'.'+k).alias(col_name+'_'+k) for k in [ n.name for n in  complex_fields[col_name]]]
         df=df.select("*", *expanded).drop(col_name)
    
      # if ArrayType then add the Array Elements as Rows using the explode function
      # i.e. explode Arrays
      elif (type(complex_fields[col_name]) == ArrayType):    
         df=df.withColumn(col_name,explode_outer(col_name))
    
      # recompute remaining Complex Fields in Schema       
      complex_fields = dict([(field.name, field.dataType)
                             for field in df.schema.fields
                             if type(field.dataType) == ArrayType or  type(field.dataType) == StructType])
   return df

flattened_df = flatten(df)
df1 = flattened_df
df1 = df1.select(
    df1["_source_Vrvirksomhed_cvrNummer"],
    df1["_source_Vrvirksomhed_deltagerRelation_organisationer_medlemsData_attributter_vaerdier_vaerdi"],
    df1["_source_Vrvirksomhed_deltagerRelation_organisationer_medlemsData_attributter_vaerdier_periode_gyldigFra"],
    df1["_source_Vrvirksomhed_deltagerRelation_organisationer_medlemsData_attributter_vaerdier_periode_gyldigTil"] )
df2 = df1.filter(df1._source_Vrvirksomhed_deltagerRelation_organisationer_medlemsData_attributter_vaerdier_vaerdi == "REVISION")
df2 = df2[(df2._source_Vrvirksomhed_cvrNummer == 10088437)]

然而,从上面链接中的图像来看。你可以看到我有很多副本。我想删除准确和快速的方式重复
我已经尝试

  • df2.dropduplicates()
  • df2.distinct()
  • 重新分区 Dataframe
  • 缓存 Dataframe

他们都花了20多分钟来运行,没有任何成功。
有人能帮我删除重复的数据很容易数百万行

zi8p0yeb

zi8p0yeb1#

dropDuplicates是删除重复项的方法,是的,这很慢。所以一个更好的方法是不要在一开始就创建副本。也许你可以通过在爆炸前先调用array_distinct来限制重复数?这一切都高度依赖于您的输入数据。

相关问题