Pyspark:不能在spark作业中使用dbutils

9jyewag0  于 2024-01-06  发布在  Spark
关注(0)|答案(1)|浏览(161)

我正在尝试在Databricks中并行执行文件复制。使用多个执行器是一种方法。因此,这是我在pyspark中编写的代码。

def parallel_copy_execution(src_path: str, target_path: str):
  files_in_path = dbutils.fs.ls(src_path)
  file_paths_df = spark.sparkContext.parallelize(files_in_path).toDF()
  file_paths_df.foreach(lambda x: dbutils.fs.cp(x.path.toString(), target_path, recurse=True))

字符串
我获取了所有要复制的文件并创建了一个DataFrame。当试图在DataFrame上运行foreach时,我得到了以下错误。它说You cannot use dbutils within a spark job

You cannot use dbutils within a spark job or otherwise pickle it.
            If you need to use getArguments within a spark job, you have to get the argument before
            using it in the job. For example, if you have the following code:

              myRdd.map(lambda i: dbutils.args.getArgument("X") + str(i))

            Then you should use it this way:

              argX = dbutils.args.getArgument("X")
              myRdd.map(lambda i: argX + str(i))


但是当我在Scala中尝试同样的方法时,它工作得很好。dbutils在一个spark job中使用。附加这段代码也是如此。

def parallel_copy_execution(p: String, t: String): Unit = {
  dbutils.fs.ls(p).map(_.path).toDF.foreach { file =>
    dbutils.fs.cp(file(0).toString,t , recurse=true)
    println(s"cp file: $file")
  }
}


Pyspark的API没有更新来处理这个问题吗?

mrfwxfqh

mrfwxfqh1#

你应该能够使用dbfs FUSE挂载来完成拷贝,而不是dbutils.fs.cp(),你只需要从操作系统路径/dbfs/<rest-of-path>做一个普通的python文件拷贝。
所以在你的foreach里面,

dbutils.fs.cp('dbfs:/mnt/src/file.txt', 'dbfs:/mnt/tgt/file.txt')

字符串

shutil.copyfile('/dbfs/mnt/src/file.txt', '/dbfs/mnt/tgt/file.txt')

相关问题