下面的代码运行良好,我得到的输出是:Dataframe image
我的示例模式示例:Sample Schema
from pyspark.sql.types import *
from pyspark.sql.functions import *
#Flatten array of structs and structs
def flatten(df):
# compute Complex Fields (Lists and Structs) in Schema
complex_fields = dict([(field.name, field.dataType)
for field in df.schema.fields
if type(field.dataType) == ArrayType or type(field.dataType) == StructType])
while len(complex_fields)!=0:
col_name=list(complex_fields.keys())[0]
# if StructType then convert all sub element to columns.
# i.e. flatten structs
if (type(complex_fields[col_name]) == StructType):
expanded = [col(col_name+'.'+k).alias(col_name+'_'+k) for k in [ n.name for n in complex_fields[col_name]]]
df=df.select("*", *expanded).drop(col_name)
# if ArrayType then add the Array Elements as Rows using the explode function
# i.e. explode Arrays
elif (type(complex_fields[col_name]) == ArrayType):
df=df.withColumn(col_name,explode_outer(col_name))
# recompute remaining Complex Fields in Schema
complex_fields = dict([(field.name, field.dataType)
for field in df.schema.fields
if type(field.dataType) == ArrayType or type(field.dataType) == StructType])
return df
flattened_df = flatten(df)
df1 = flattened_df
df1 = df1.select(
df1["_source_Vrvirksomhed_cvrNummer"],
df1["_source_Vrvirksomhed_deltagerRelation_organisationer_medlemsData_attributter_vaerdier_vaerdi"],
df1["_source_Vrvirksomhed_deltagerRelation_organisationer_medlemsData_attributter_vaerdier_periode_gyldigFra"],
df1["_source_Vrvirksomhed_deltagerRelation_organisationer_medlemsData_attributter_vaerdier_periode_gyldigTil"] )
df2 = df1.filter(df1._source_Vrvirksomhed_deltagerRelation_organisationer_medlemsData_attributter_vaerdier_vaerdi == "REVISION")
df2 = df2[(df2._source_Vrvirksomhed_cvrNummer == 10088437)]
然而,从上面链接中的图像来看。你可以看到我有很多副本。我想删除准确和快速的方式重复
我已经尝试
- df2.dropduplicates()
- df2.distinct()
- 重新分区 Dataframe
- 缓存 Dataframe
他们都花了20多分钟来运行,没有任何成功。
有人能帮我删除重复的数据很容易数百万行
1条答案
按热度按时间zi8p0yeb1#
dropDuplicates是删除重复项的方法,是的,这很慢。所以一个更好的方法是不要在一开始就创建副本。也许你可以通过在爆炸前先调用array_distinct来限制重复数?这一切都高度依赖于您的输入数据。