背景:
PySpark和大数据新手。我的数据中10%的行(从CSV读入Pandas数据框)是重复的,并且pd.drop_duplicates()没有删除所有重复的行,我的故障排除步骤是使用splitting & concat,因为这个问题似乎与数据的大小有关(+13M行)。但是这看起来效率很低,特别是因为我以后必须在许多类似大小的CSV上运行这个操作。一些研究表明,使用PySpark并行化操作可能会带来一个整体上更高效的解决方案。
**问题:**从大型CSV文件(+1GB)阅读数据以删除重复行的有效方法是什么?
是否可以使用PySpark从CSV读入不同的行?这是否比先阅读整个数据集,然后应用PySpark distinct(),再写入新文件更有效?我的直觉是,使用并行化只读入不同的行,将减少每次迭代期间使用的内存量--或者可能我遗漏了一些关键概念,这些概念会使这一点不准确。
代码示例:
--更新--我终于安装了pyspark和它的依赖项--看起来是正确的--并在测试 Dataframe 上运行了下面的代码。遇到了很长的ConnectionRefusedError消息和一个任务大小超过建议大小的警告。可能是因为我没有包含任何代码来并行化操作(????)。大脑需要休息一下。
下面是我对第二个选项如何在一个CSV上工作(读入数据后获取不同的行)的理解:
df = pd.read_csv("<data>.csv", sep=",")
# create spark session
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
# create spark dataframe
df_spark = spark.createDataFrame(df)
# display distinct data
df_distinct = df_spark.distinct()
# write de-duplicated data to new CSV
df_distinct.to_csv("<filename>.csv", index=False)
我尝试过拆分数据,然后使用drop_duplicates()并连接到一个新的 Dataframe 中,我还研究了spark.read.format和其他PySpark方法。
1条答案
按热度按时间jhkqcmku1#
不要使用Pandas来读取csv,直接使用spark。
然后写入:
你必须能够读取整个文件,但是spark需要读取数据,以找出不同的记录是什么。不能绕过一个csv文件加载到Spark。