给出了s3位置序列所需的信息。任何两个位置的差异是表的分区列值。每个Parquet文件夹都有相同的模式。因此,我们需要循环使用相同模式的s3Parquet文件路径序列,并将其保存在scala中的单个Dataframe中。
daupos2t1#
如果您打开这些选项,那么只需递归地加载文件即可。
spark.read.parquet("s3a://path/to/root/")
选项如下。
spark.hive.mapred.supports.subdirectories true spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive true
这可以用在
import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.sql.SparkSession val conf = new SparkConf() .setMaster("local[2]") .setAppName("test") .set("spark.hive.mapred.supports.subdirectories","true") .set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive","true") val spark = SparkSession.builder.config(conf).getOrCreate() val df = spark.read.parquet("s3a://path/to/root/")
ou6hu8tu2#
如果您有一个包含所有要导入的目录的数组,您可以在该数组上迭代,生成一个Dataframe集合,然后将它们合并为一个。试试这样的。
//You have now a collection of dataframes val dataframes = directories.map(dir => spark.read.parquet(dir)) //Let's union them into one val df_union = dataframes.reduce(_ union _)
2条答案
按热度按时间daupos2t1#
如果您打开这些选项,那么只需递归地加载文件即可。
选项如下。
这可以用在
ou6hu8tu2#
如果您有一个包含所有要导入的目录的数组,您可以在该数组上迭代,生成一个Dataframe集合,然后将它们合并为一个。
试试这样的。