我有一个Hive表分区的时间戳上面的Parquet文件与快速转换。基本上路径看起来像:
s3:/bucketname/project/flowtime=0/
s3:/bucketname/project/flowtime=1/
s3:/bucketname/project/flowtime=2/
...
考虑到这张表,我发现有些不一致。问题是,由于一个字段在某些Parquet模式中给出longtype,在另一个模式中给出string,因此运行查询会抛出classcastexception。
所以我现在要做的是读取我所有的Parquet文件并检查它们的模式,这样我就可以重新创建它们。我想把我的文件名Map到相关Parquet地板的模式。这样我就可以:
filename | schema
s3:/bucketname/project/flowtime |StructField(StructField(Id,StringType,True),
|StructField(Date,StringType,True)
所以我尝试将spark与scala结合使用,并将org.apache.spark.sql.functions的函数输入文件名 Package 在一个udf中。它工作得很好。
val filename = (path: String) => path
val filenameUDF = udf(filename)
val df=sqlContext.parquetFile("s3a://bucketname/").select(filenameUDF(input_file_name())).toDF()
df.map(lines =>(lines.toString,sqlContext.read.parquet(lines.toString.replace("[","").replace("]","")).schema.toString)})
这是为了给出一个rdd[(string,string)],只是看起来在我的Map中读取Parquet的部分有一个nullpointerexception。
ERROR scheduler.TaskSetManager: Task 0 in stage 14.0 failed 4 times; aborting job
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 14.0 failed 4 times, most recent failure: Lost task 0.3 in stage 14.0 (TID 35, CONFIDENTIAL-SERVER-NAME, executor 13): java.lang.NullPointerException
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:32)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:32)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1328)
at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1328)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1888)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1888)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:242)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
如果你有任何想法为什么阅读Parquet似乎不工作的Map内,请让我知道为什么,因为这两个部分我想创建(文件名和模式)似乎工作良好,但加入他们不。
另外,如果您有更好的主意如何解决我的Parquet文件之间的不一致性,使我的配置单元表损坏,因为我没有看到其他选择,而不是这样做,因为Parquet是不可变的,更改配置单元元数据不会更改每个文件中嵌入的Parquet元数据。
谢谢你的关注。雷诺
1条答案
按热度按时间rdrgkggo1#
我建议你在你的工作清单上再找一个。
首先,您可以使用liststatus读取和存储s3 bucket名称,然后在每个路径上循环。
希望对你有帮助。
当做。史提芬