使用spark从s3加载嵌套csv文件

kx7yvsdv  于 2021-06-02  发布在  Hadoop
关注(0)|答案(1)|浏览(330)

我有数百个gzip csv文件在s3中,我试图加载。目录结构类似于以下内容:

bucket
-- level1
---- level2.1
-------- level3.1
------------ many files 
-------- level3.2
------------ many files 
---- level2.2
-------- level3.1
------------ many files 
-------- level3.2
------------ many files

每个目录下可能有几个level2、level3目录和许多文件。过去,我使用.textfile加载数据,并使用通配符传递路径,如:

s3a://bucketname/level1/**

它可以很好地加载所有子路径下的所有文件。我现在正在尝试使用spark 2中的csv加载机制,不断出现以下错误:

java.lang.IllegalArgumentException: Can not create a Path from an empty string
at org.apache.hadoop.fs.Path.checkPathArg(Path.java:126)
at org.apache.hadoop.fs.Path.<init>(Path.java:134)
at org.apache.hadoop.util.StringUtils.stringToPath(StringUtils.java:245)
at org.apache.hadoop.mapred.FileInputFormat.setInputPaths(FileInputFormat.java:377)
at org.apache.spark.SparkContext$$anonfun$hadoopFile$1$$anonfun$30.apply(SparkContext.scala:1014)
at org.apache.spark.SparkContext$$anonfun$hadoopFile$1$$anonfun$30.apply(SparkContext.scala:1014)
at org.apache.spark.rdd.HadoopRDD$$anonfun$getJobConf$6.apply(HadoopRDD.scala:179)
at org.apache.spark.rdd.HadoopRDD$$anonfun$getJobConf$6.apply(HadoopRDD.scala:179)
at scala.Option.foreach(Option.scala:257)

我尝试使用以下路径:
s3a://bucketname/level1级/**
s3a://bucketname/level1级/
s3a://bucketname/level1级
所有的结果都是相同的错误。如果我使用**s3a://bucketname/level1/level2.1/level3.1/**加载该目录下的所有文件,但如果我尝试使用更高级别的目录,则失败。
我要加载的代码是:

Dataset<Row> csv = sparkSession.read()
            .option("delimiter", parseSettings.getDelimiter().toString())
            .option("quote", parseSettings.getQuote())
            .csv(path);

虽然csv加载使用的是sparks正常的文件解析策略,但其行为似乎不同于使用textfile,有没有一种方法可以实现用csv格式加载所有文件?
谢谢,
内森

57hvy0tb

57hvy0tb1#

听起来像个虫子。
这意味着:在issue.apache.org中搜索错误消息和堆栈跟踪。fwiw,可能是spark-15473。如果它在那里&还没有,把它和你的堆栈一起添加;如果没有新的东西。
第一:将其与s3输入隔离;尝试用file://url复制它。这将有助于把责任指向正确的代码。
还有,变通时间。databricks csv阅读器仍然可以工作

相关问题