是否可以(更有效地)使用PySpark读入不同的行?

h5qlskok  于 2023-03-17  发布在  Spark
关注(0)|答案(1)|浏览(158)

背景:

PySpark和大数据新手。我的数据中10%的行(从CSV读入Pandas数据框)是重复的,并且pd.drop_duplicates()没有删除所有重复的行,我的故障排除步骤是使用splitting & concat,因为这个问题似乎与数据的大小有关(+13M行)。但是这看起来效率很低,特别是因为我以后必须在许多类似大小的CSV上运行这个操作。一些研究表明,使用PySpark并行化操作可能会带来一个整体上更高效的解决方案。

**问题:**从大型CSV文件(+1GB)阅读数据以删除重复行的有效方法是什么?

是否可以使用PySpark从CSV读入不同的行?这是否比先阅读整个数据集,然后应用PySpark distinct(),再写入新文件更有效?我的直觉是,使用并行化只读入不同的行,将减少每次迭代期间使用的内存量--或者可能我遗漏了一些关键概念,这些概念会使这一点不准确。

代码示例:

--更新--我终于安装了pyspark和它的依赖项--看起来是正确的--并在测试 Dataframe 上运行了下面的代码。遇到了很长的ConnectionRefusedError消息和一个任务大小超过建议大小的警告。可能是因为我没有包含任何代码来并行化操作(????)。大脑需要休息一下。

下面是我对第二个选项如何在一个CSV上工作(读入数据后获取不同的行)的理解:

df = pd.read_csv("<data>.csv", sep=",")

# create spark session
spark = SparkSession.builder.appName('sparkdf').getOrCreate()

# create spark dataframe
df_spark = spark.createDataFrame(df)

# display distinct data
df_distinct = df_spark.distinct()

# write de-duplicated data to new CSV
df_distinct.to_csv("<filename>.csv", index=False)

我尝试过拆分数据,然后使用drop_duplicates()并连接到一个新的 Dataframe 中,我还研究了spark.read.format和其他PySpark方法。

jhkqcmku

jhkqcmku1#

不要使用Pandas来读取csv,直接使用spark。

df_distinct = spark.read.csv("<data>.csv").distinct()

然后写入:

df_distinct.write.csv("filename>.csv")

你必须能够读取整个文件,但是spark需要读取数据,以找出不同的记录是什么。不能绕过一个csv文件加载到Spark。

相关问题