我有一个spark任务,它在hive中创建一个结果表,并从其他表加载数据。这个作业每天都在执行,并在hdfs上生成大量文件。当我使用hive时,我编写了一个脚本:alter table quality \u of \u service \u 1 concatenate;
implicit val spark: SparkSession = SparkSession
.builder()
.enableHiveSupport()
.appName("Test")
.getOrCreate()
def build(implicit session: SparkSession, config: Config): Unit = {
import session.implicits._
loadData
.flatMap(Item.buildInternal(_, config))
.write
.mode(SaveMode.Overwrite)
.format("orc")
.saveAsTable(s"${config.schema}.result_table")
}
你能写下如何合并这些文件和它的调子吗?带hdfs、spark或spark会话配置的shell脚本。
配置:
--deploy-mode cluster \
--conf spark.rpc.message.maxSize=300 \
--conf spark.rdd.compress=true \
--conf spark.default.parallelism=1009 \
--conf spark.sql.shuffle.partitions=1009 \
--conf spark.sql.autoBroadcastJoinThreshold=31457280 \
--conf spark.dynamicAllocation.enabled=true \
--conf spark.shuffle.service.enabled=true \
--conf spark.dynamicAllocation.initialExecutors=1200 \
--conf spark.dynamicAllocation.minExecutors=400 \
--conf spark.dynamicAllocation.maxExecutors=1200 \
--conf spark.sql.files.maxPartitionBytes=1073741824 \
--executor-cores 3 \
--executor-memory 7g \
--driver-memory 4g \
1条答案
按热度按时间46qrfjad1#
在spark2中,一个参数控制一个分区的大小。。。因此,“保存”时的文件数。因此,增加参数以拥有更少的分区和更少的文件(例如,每个分区一个千兆字节)
val maxSplit=1024*1024*1024 spark.conf.set("spark.sql.files.maxPartitionBytes", maxSplit)