pyspark 在spark中写入时如何更改csv文件名?

gpnt7bae  于 2023-03-01  发布在  Spark
关注(0)|答案(3)|浏览(332)

我正在尝试在代码中重命名文件

from pyspark.sql import *
from IPython.core.display import display, HTML

display(HTML("<style>.container { width:100% !important; }</style>"))

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option") \
    .getOrCreate()
    
df = spark.read.csv("../work/data2/*.csv", inferSchema=True, header=False)

df.createOrReplaceTempView("iris")
result = spark.sql("select * from iris where _c1 =2 order by _c0 ")
summary=result.describe(['_c10'])
summary.show()
summary.coalesce(1).write.csv("202003/data1_0331.csv")

.write.csv(“202003/data1_0331.csv”)在此代码中,我的spark创建了所有内容文件夹
结果

"202003/data1_0331.csv/part-00000-3afd3298-a186-4289-8ba3-3bf55d27953f-c000.csv

我想要的结果是

202003/data1_0331.csv

我如何得到我想要的结果?我在这里看到了类似的解决方案,如write.csv(summary,file=“data1_0331”),但我得到了这个错误

cannot resolve '`0`' given input columns
irtuqstp

irtuqstp1#

您无法控制写Spark操作的输出名称。
但是,您始终可以重命名它:

from py4j.java_gateway import java_import

java_import(spark._jvm, 'org.apache.hadoop.fs.Path')

fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration())

list_status = fs.listStatus(spark._jvm.org.apache.hadoop.fs.Path(CSVPath))

file_name = [file.getPath().getName() for file in list_status if file.getPath().getName().startswith('part-')][0]

print(file_name)

fs.rename(sc._jvm.Path(CSVPath+''+file_name), sc._jvm.Path(CSVPath+"data1_0331.csv"))

这段代码将列出输出路径中的所有文件,并查找以part-开头的文件,然后将它们重命名为所需的名称。

8zzbczxx

8zzbczxx2#

Spark使用并行来加速计算,所以Spark尝试为一个CSV写多个文件是正常的,这将加快阅读部分的速度。
所以如果你只使用Spark:保持这样,会更快。
但是,如果您真的想将数据保存为单个CSV文件,您可以使用panda,如下所示:
summary.toPandas().to_csv("202003/data1_0331.csv")

xxe27gdn

xxe27gdn3#

当使用pyspark写入文件时,我们不能强制更改文件名,唯一的方法是在写入文件后,我们可以在函数的帮助下重命名它

source_path = "your source path"
destination_path = "your destination path"

def rename_file_with_location(source_path,destination_path,file_name):
    files = dbutils.fs.ls(source_path)
    csv_file = [x.path for x in files if x.path.endswith(".csv")][0]
    file_name_csv=csv_file.split('/')[4]
    dbutils.fs.mv(csv_file, destination_path + file_name)
    print("File has been renamed from"+source_path+"to this"+destination_path+file_name)

在这个函数的帮助下,你可以重命名pyspark分区的csv文件。

注:-此功能只适用于一个csv文件,您可以通过更改代码的第二行轻松地将其更改为多个,或者如果您不想更改代码,您也可以在一个分区中编写,但它有自己的缺点。

It can be done by using the .coalesce(1) function

相关问题