使用spark上的配置单元合并小文件

q1qsirdb  于 2021-06-24  发布在  Hive
关注(0)|答案(1)|浏览(424)

我想合并输出到128mb每个文件在配置单元。在spark中,我设置了以下属性,但仍然无法工作。有人能给我一个建议吗?

val spark = SparkSession.builder
      .appName("MyExample")
      .master("local[*]")
      .enableHiveSupport()
      .getOrCreate()

spark.sqlContext.setConf("hive.mapred.supports.subdirectories", "true")
spark.sqlContext.setConf("mapreduce.input.fileinputformat.input.dir.recursive", "true")
spark.sqlContext.setConf("hive.hadoop.supports.splittable.combineinputformat", "true")
    spark.sqlContext.setConf("hive.exec.compress.output", "false")
    spark.sqlContext.setConf("hive.input.format", "org.apache.hadoop.hive.ql.io.CombineHiveInputFormat")
    spark.sqlContext.setConf("hive.merge.mapfiles", "true")
    spark.sqlContext.setConf("hive.merge.mapredfiles", "true")
    spark.sqlContext.setConf("hive.merge.size.per.task", "128000000")
    spark.sqlContext.setConf("hive.merge.smallfiles.avgsize", "128000000")
    spark.sqlContext.setConf("hive.groupby.skewindata", "true")
    spark.sqlContext.setConf("hive.merge.sparkfiles", "true")
    spark.sqlContext.setConf("hive.merge.mapfiles", "true")

 val df = spark.read.format("csv")
      .option("header", "false").load(path)
df.write.format("csv").saveAsTable("test_table")
dluptydi

dluptydi1#

您可以估计或计算Dataframe的大小,如那篇文章中所述如何找到sparkrdd/Dataframe大小?
然后做一个

val nParitions = (sizeInMB/128).ceil
df.repartition(nPartitions).write.format(....).saveAsTable(...)```

相关问题