在scala中并行运行函数

pvabu6sv  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(574)

我有一个sparksql函数,它在hdfs目录中生成temp文件。我想在函数运行时打印所有目录和文件。
下面是函数:

spark.sql(s"INSERT INTO ${table} VALUES ....")

当函数/查询运行时,我想查看在hdfs目录下生成的文件。因为这些文件是临时的,所以我想在查询运行时多次列出目录。

FileSystem.get( sc.hadoopConfiguration ).listStatus( new Path("hdfs:///mypath")).foreach( x => println(x.getPath ))

我是scala编程新手,实在找不到并行运行的方法。

jk9hmnmh

jk9hmnmh1#

当然。你可以把它包起来 spark.sql(query) 在一个 scala.concurrent.Future[Unit] .

import scala.concurrent.ExecutionContext.Implicits.global

val insert = scala.concurrent.Future {
  spark.sql(s"INSERT INTO ${table} VALUES ....")
} // begins to work immediately

然后,当它执行时,您可以看到它创建的文件。

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path

val fs = FileSystem.get(sc.hadoopConfiguration)
val path = new Path("hdfs:///mypath")
while(!insert.isCompleted){
 Thread.sleep(1000) // Sleep to limit how often your message prints
 fs.listStatus(path).foreach(x => println(x.getPath))
}

请记住,您每次都会查看整个文件列表。

相关问题