如何使用Pyspark在ADLS中写入.csv文件

u3r8eeie  于 2022-11-21  发布在  Spark
关注(0)|答案(2)|浏览(154)

我 正在 从 ADLS 读取 json 文件 , 然后 通过 将 扩展 名 更改 为 . csv 将 其 写 回 ADLS , 但 在 ADLS 中 创建 了 一些 随机 文件 名 ( 在 Azure Synapse 中 编写 脚本 )
One _ success 文件 和 part - 000 - * * * . csv 类似 于 此 的 一些 随机 文件 名 正在 生成
我 希望 将 文件 名 保存 为 ex :sfmc.json 它 应该 以 sfmc.csv 的 形式 写入 adls 中

dphi5xsq

dphi5xsq1#

这 就是 spark 中 不同 分区 的 数据 持久 化 的 方式 。 你 可以 使用 databricks fs 工具 来 重 命名 文件 。
我 写 了 一 个 小 的 实用 程序 函数 来 收集 一 个 分区 上 的 所有 数据 , 作为 parquet 持久 化 , 并 重 命名 文件 夹 中 唯一 的 数据 文件 。 您 可以 将 其 用于 JSON 或 CSV 。 该 实用 程序 接受 文件 夹 路径 和 文件 名 , 创建 一 个 " tmp " 文件 夹 用于 持久 化 , 然后 将 文件 移动 并 重 命名 到 所 需 的 文件 夹 :

  1. def export_spark_df_to_parquet(df, dir_dbfs_path, parquet_file_name):
  2. tmp_parquet_dir_name = "tmp"
  3. tmp_parquet_dir_dbfs_path = dir_dbfs_path + "/" + tmp_parquet_dir_name
  4. parquet_file_dbfs_path = dir_dbfs_path + "/" + parquet_file_name
  5. # Export dataframe to Parquet
  6. df.repartition(1).write.mode("overwrite").parquet(tmp_parquet_dir_dbfs_path)
  7. listFiles = dbutils.fs.ls(tmp_parquet_dir_dbfs_path)
  8. for _file in listFiles:
  9. if len(_file.name) > len(".parquet") and _file.name[-len(".parquet"):] == ".parquet":
  10. dbutils.fs.cp(_file.path, parquet_file_dbfs_path)
  11. break

中 的 每 一 个
用法 :

  1. export_spark_df_to_parquet(df, "dbfs:/my_folder", "my_df.parquet")

格式

展开查看全部
bqucvtff

bqucvtff2#

Spark不允许按要求命名文件。它会生成随机文件名的部分文件。当我使用df.write(其中df是spark Dataframe )时,我得到一个随机生成的文件名。

  • 如果你想生成一个特定名称的文件名,你必须使用Pandas,使用toPandas()将spark Dataframe 转换为Pandas Dataframe ,然后使用to_csv()方法保存文件(考虑csv作为所需的文件格式)。
  1. pdf = df.toPandas()
  2. pdf.to_csv("abfss://data@datalk0711.dfs.core.windows.net/output/output.csv")

  • 运行上述代码生成了具有所需文件名的所需文件。

相关问题