Spark (version=2.2.0)
没有 DirectParquetOutputCommitter
. 作为替代,我可以使用
dataset
.option("mapreduce.fileoutputcommitter.algorithm.version", "2")//magic here
.parquet("s3a://...")
以避免创建 _temporary
上的文件夹 S3
.
一切正常,直到我设定了一个 partitionBy
到我的数据集
dataset
.partitionBy("a", "b")
.option("mapreduce.fileoutputcommitter.algorithm.version", "2")//magic stop working creating _temporary on S3
.parquet("s3a://...")
也尝试过添加,但不起作用
spark.conf.set("mapreduce.fileoutputcommitter.algorithm.version", "2")
与 partitionBy
在spark数据集,它将创建 _temporary
移动文件,这是一个非常缓慢的操作。
是否有其他配置或缺少配置?
2条答案
按热度按时间hgc7kmma1#
备选方案(按推荐+轻松-最好是top的顺序排列):
使用netflix的s3committer:https://github.com/rdblue/s3committer/
写入hdfs,然后复制到s3(例如通过s3distcp)
不要使用partitionby,而是迭代所有分区排列,并将结果动态写入每个分区目录
编写自定义文件提交程序
h22fl7wq2#
hadoop3.1的s3a将内置一个零重命名提交程序(va hadoop-13786)。在那之前,你可以利用它的前身,也就是netflix
注意,“算法2”并不是消除临时目录的神奇步骤,只是在单个任务提交时将任务输出直接重命名为目标。如果目录列表中存在延迟一致性,并且仍然是o(数据),那么仍然容易出错。您不能安全地将v1或v2提交程序直接用于s3,而不是hadoop2.x中提供的s3a连接器