sparksql:访问当前工作节点目录中的文件

5uzkadbs  于 2021-06-02  发布在  Hadoop
关注(0)|答案(2)|浏览(376)

我需要使用sparksql读取一个文件,并且该文件位于当前目录中。
我使用这个命令来解压存储在hdfs上的文件列表。

val decompressCommand = Seq(laszippath, "-i", inputFileName , "-o", "out.las").!!

文件在当前工作节点目录中输出,我知道这是因为 "ls -a"!! 通过scala我可以看到文件在那里。然后,我尝试使用以下命令访问它:

val dataFrame = sqlContext.read.las("out.las")

我假设sql上下文会尝试在当前目录中查找该文件,但事实并非如此。此外,它不会抛出错误,但会发出警告,指出找不到该文件(因此spark将继续运行)。
我尝试使用以下方法添加文件: sparkContext.addFile("out.las") 然后使用以下方法访问位置: val location = SparkFiles.get("out.las") 但这也不管用。
我甚至执行了命令 val locationPt = "pwd"!! 然后就这么做了 val fullLocation = locationPt + "/out.las" 并试图使用该值,但也不起作用。
引发的实际异常如下所示:

User class threw exception: org.apache.spark.sql.AnalysisException: cannot resolve 'x' given input columns: [];
org.apache.spark.sql.AnalysisException: cannot resolve 'x' given input columns: []

当我试图从Dataframe访问列“x”时,就会发生这种情况。我知道“x”列的存在是因为我从hdfs下载了一些文件,在本地解压并运行了一些测试。
我需要逐个解压文件,因为我有1.6tb的数据,所以我不能一次解压,以后再访问它们。
有人能告诉我如何访问输出到worker节点目录的文件吗?或者我应该换个方式?

byqmnocz

byqmnocz1#

我以前使用过hadoopapi来获取文件,我不知道它是否能帮助您。

val filePath = "/user/me/dataForHDFS/"
val fs:FileSystem = FileSystem.get(new java.net.URI(filePath + "out.las"), sc.hadoopConfiguration)

我还没有测试过下面的内容,但我很确定我是在非法地将java数组传递给scala。但我只是想知道之后该怎么做。

var readIn: Array[Byte] = Array.empty[Byte]
val fileIn: FSDataInputStream = fs.open(file)
val fileIn.readFully(0, readIn)
wf82jlnq

wf82jlnq2#

所以我现在设法做到了。我要做的是将文件保存到hdfs,然后通过hdfs使用sql上下文检索文件。我在hdfs中每次都覆盖“out.las”,这样就不必占用太多空间。

相关问题