在Spark(3.2.0)应用中,我需要为写入HDFS的不同文件更改复制因子。例如,我写一些临时文件,希望它们的复制因子为1。然后,我写一些持久文件,希望它们的复制因子为2,有时为3。
然而,正如我测试; SparkContext.hadoopConfiguration
中的dfs.replication
完全不影响文件的复制因子,而spark.hadoop.dfs.replication
仅在使用先前定义的SparkConf
创建SparkSession
时设置该因子(或更改在HDFS端设置的默认复制),如下所示。
val conf = new SparkConf()
conf.set("spark.hadoop.dfs.replication", "1")) // works but cannot be changed later.
val sparkSession: SparkSession = SparkSession.builder.config(conf).getOrCreate()
在文档中做了一些搜索之后,我发现Spark 3.0中添加到core conf中的配置spark.sql.legacy.setCommandRejectsSparkCoreConfs
,默认设置为true
,要更改其他一些core conf,需要在创建SparkSession
时显式设置为false
。即使我这样做了,并防止了类似org.apache.spark.sql.AnalysisException: Cannot modify the value of a Spark config
的错误,通过在如下函数中设置两个配置,将复制因子设置为不同的值
def setReplicationFactor(rf: Short): Unit = {
val activeSparkSession = SparkSession.getActiveSession.get
activeSparkSession.conf.set("spark.hadoop.dfs.replication", rf.toString)
activeSparkSession.sparkContext.hadoopConfiguration.set("dfs.replication", rf.toString)
}
不会变更以更新的SparkConf
和SparkContext.hadoopConfiguration
写入的档案。
是否有办法在同一个spark会话中使用不同的复制因子将文件写入HDFS?
1条答案
按热度按时间tez616oj1#
完全可以在每个文件/文件夹的基础上完成。但是你需要使用Hadoop工具。
剩余呼叫:https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/WebHDFS.html
也有命令行选项,但我认为WebHDFS更干净。