如何在spark中合并小文件,同时写入hiveorc表

f0brbegy  于 2021-05-29  发布在  Hadoop
关注(0)|答案(1)|浏览(704)

我正在从s3读取csv文件,并作为orc写入一个配置单元表。写的时候,它写了很多小文件。我需要合并所有这些文件。我设置了以下属性:

spark.sql("SET hive.merge.sparkfiles = true")
 spark.sql("SET hive.merge.mapredfiles = true")
 spark.sql("SET hive.merge.mapfiles = true")
 spark.sql("set hive.merge.smallfiles.avgsize = 128000000")
 spark.sql("set hive.merge.size.per.task = 128000000")

除了这些配置之外,我还尝试了重新分区(1)和合并(1),它将合并到单个文件中,但它会删除配置单元表并再次创建它。

masterFile.repartition(1).write.mode(SaveMode.Overwrite).partitionBy(<partitioncolumn>).orc(<HIVEtablePath>);

如果我使用append模式而不是overwrite,它会在每个分区下创建重复文件。

masterFile.repartition(1).write.mode(SaveMode.Append).partitionBy(<partitioncolumn>).orc(<HIVEtablePath>);

在这两种情况下,spark作业运行两次,第二次执行失败。

有没有什么方法可以使用repartition/coalesce with append模式而不在每个分区中复制部件文件

kzmpq1sx

kzmpq1sx1#

masterFile.repartition(1).write.mode(SaveMode.Overwrite).partitionBy(<partitioncolumn>).orc(<HIVEtablePath>)

.orc()方法将数据作为文件写入,而不接触元信息。所以它无法覆盖配置单元中的表。
如果要覆盖配置单元表中的数据,请使用method.insertinto(配置单元表名),其中配置单元表名是配置单元中表的全名(schema+表名)
根据你的例子

masterFile.repartition(1).write.mode(SaveMode.Overwrite).partitionBy(<partitioncolumn>).insertInto(hiveTableName)

也可以用元数据信息覆盖数据。带有覆盖修饰符的saveastable(hive\u table\u name)也将覆盖metastore中的数据。

masterFile.repartition(1).write.mode(SaveMode.Overwrite).partitionBy(<partitioncolumn>).saveAsTable(hiveTableName)

相关问题