scala Spark:仅当路径存在时才读取文件

4uqofj5v  于 2023-01-20  发布在  Scala
关注(0)|答案(5)|浏览(162)

我正在尝试读取scala中路径Sequence中的文件。下面是示例(伪)代码:

val paths = Seq[String] //Seq of paths
val dataframe = spark.read.parquet(paths: _*)

现在,在上面的序列中,一些路径存在,而另一些路径不存在。有没有办法在阅读parquet文件时忽略丢失的路径(以避免org.apache.spark.sql.AnalysisException: Path does not exist)?
我已经尝试了下面的方法,它似乎是工作,但后来,我结束了阅读相同的路径两次,这是我想避免这样做:

val filteredPaths = paths.filter(p => Try(spark.read.parquet(p)).isSuccess)

我检查了DataFrameReaderoptions方法,但它似乎没有任何类似于ignore_if_missing的选项。
此外,这些路径可以是hdfss3(此Seq作为方法参数传递),在阅读时,我不知道路径是s3还是hdfs,因此无法使用s3hdfs特定的API来检查是否存在。

edqdpe6u

edqdpe6u1#

你可以过滤掉不相关的文件,就像@Psidom的回答一样。在spark中,最好的方法是使用内部的spark hadoop配置。假设spark会话变量被称为“spark”,你可以:

import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path

val hadoopfs: FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration)

def testDirExist(path: String): Boolean = {
  val p = new Path(path)
  hadoopfs.exists(p) && hadoopfs.getFileStatus(p).isDirectory
}
val filteredPaths = paths.filter(p => testDirExists(p))
val dataframe = spark.read.parquet(filteredPaths: _*)
zaq34kh6

zaq34kh62#

先过滤paths怎么样?

paths.filter(f => new java.io.File(f).exists)

例如:

Seq("/tmp", "xx").filter(f => new java.io.File(f).exists)
// res18: List[String] = List(/tmp)
5tmbdcev

5tmbdcev3#

从Spark 2.3.0开始就有一个配置spark.sql.files.ignoreMissingFiles,只要设置为true即可。
https://spark.apache.org/docs/latest/configuration.html

hfyxw5xn

hfyxw5xn4#

PySpark 3.1或更高版本

遗憾的是,在Spark 3.1中还没有任何标志(至少我不知道)来忽略它们。但是你可以尝试这些简单的事情。好消息是加载接口也可以在列表上工作。见下文。

# add you list of paths here
addrs = ["path1", "path2", ...]

# check if they exists, update the list
for add in addrs:
    try:
        spark.read.format("parquet").load(add)
    except:
        print(add)
        addrs.remove(add)

# read the updated list now
sdf_a = spark\
        .read\
        .format("parquet")\
        .load(addrs)
bkkx9g8r

bkkx9g8r5#

@s510有一个很好的答案,带有一种Python“duck typing”的风格,但是,我更喜欢在可能的情况下使用不变性,所以我将它重写如下:

def path_is_readable(x):
  try:
    spark.read.parquet(x)
    return True
  except:
    return False

valid_paths = [p for p in paths if path_is_readable(p)]
dataframe = spark.read.parquet(*valid_paths)

相关问题