我正在尝试将过滤后的Dataframe保存回同一源文件。
我编写了下面的代码,将目录中每个文件的内容转换为单独的Dataframe,对其进行过滤并将其保存回同一个文件
rdd = sparkSession.sparkContext.wholeTextFiles("/content/sample_data/test_data")
# collect the RDD to a list
list_elements = rdd.collect()
for element in list_elements:
path, data = element
df = spark.read.json(spark.sparkContext.parallelize([data]))
df = df.filter('d != 721')
df.write.save(path, format="json", mode="overwrite")
我原以为它会用更新的数据覆盖文件,但它正在创建一个具有文件名的文件夹,并创建以下结构和零件文件:
如何将每个更新的Dataframe保存回同一源文件(.txt)?提前谢谢。
1条答案
按热度按时间hlswsv351#
要将其保存到一个文件,请使用
.coalesce(1)
或者.repartition(1)
之前的选项.save()
,这将产生相同的类似文件夹的结构,但内部将有1个json文件。保存后,要用“普通”名称保存它,您需要剪切内部的1 json文件,粘贴并用所需名称重命名它。您可以在这里看到csv文件的代码