我正在尝试在Databricks中并行执行文件复制。使用多个执行器是一种方法。因此,这是我在pyspark中编写的代码。
def parallel_copy_execution(src_path: str, target_path: str):
files_in_path = dbutils.fs.ls(src_path)
file_paths_df = spark.sparkContext.parallelize(files_in_path).toDF()
file_paths_df.foreach(lambda x: dbutils.fs.cp(x.path.toString(), target_path, recurse=True))
字符串
我获取了所有要复制的文件并创建了一个DataFrame。当试图在DataFrame上运行foreach时,我得到了以下错误。它说You cannot use dbutils within a spark job
You cannot use dbutils within a spark job or otherwise pickle it.
If you need to use getArguments within a spark job, you have to get the argument before
using it in the job. For example, if you have the following code:
myRdd.map(lambda i: dbutils.args.getArgument("X") + str(i))
Then you should use it this way:
argX = dbutils.args.getArgument("X")
myRdd.map(lambda i: argX + str(i))
型
但是当我在Scala中尝试同样的方法时,它工作得很好。dbutils在一个spark job中使用。附加这段代码也是如此。
def parallel_copy_execution(p: String, t: String): Unit = {
dbutils.fs.ls(p).map(_.path).toDF.foreach { file =>
dbutils.fs.cp(file(0).toString,t , recurse=true)
println(s"cp file: $file")
}
}
型
Pyspark的API没有更新来处理这个问题吗?
1条答案
按热度按时间mrfwxfqh1#
你应该能够使用dbfs FUSE挂载来完成拷贝,而不是
dbutils.fs.cp()
,你只需要从操作系统路径/dbfs/<rest-of-path>
做一个普通的python文件拷贝。所以在你的
foreach
里面,字符串
做
型