我与aws合作,我有使用spark和hive的工作流。我的数据是按日期划分的,所以每天我都在s3存储器中有一个新的分区。我的问题是,有一天加载数据失败,我必须重新执行该分区。接下来是编写的代码:
df // My data in a Dataframe
.write
.format(getFormat(target)) // csv by default, but could be parquet, ORC...
.mode(getSaveMode("overwrite")) // Append by default, but in future it should be Overwrite
.partitionBy(partitionName) // Column of the partition, the date
.options(target.options) // header, separator...
.option("path", target.path) // the path where it will be storage
.saveAsTable(target.tableName) // the table name
在我的流程中发生了什么?如果使用savemode.overwrite,则整个表将被删除,并且只保存分区。如果我使用savemode.append,我可能会有重复的数据。
搜索时,我发现hive支持这种覆盖,只支持分区,但是使用hql语句,我不支持´我没有。
我们需要解决Hive的问题,这样我们才能´t使用此替代选项(直接到csv)。
我找到了这张jira的票,它可以解决我所遇到的问题´我有,但在spark的最后一个版本(2.3.0)中,情况是一样的。它删除整个表并保存分区,而不是覆盖我的数据所在的分区。
为了更清楚地说明这一点,这是一个例子:
由一个
数据:
| A | B | C |
|---|---|---|
| b | 1 | 2 |
| c | 1 | 2 |
表格:
| A | B | C |
|---|---|---|
| a | 1 | 2 |
| b | 5 | 2 |
我想要的是:在表中,分区 a
待在table上,分区 b
用数据覆盖,并添加分区 c
. 有没有什么办法可以用spark来解决这个问题?
我的最后一个选择是首先删除要保存的分区,然后使用savemode.append,但如果没有其他解决方案,我会尝试这样做。
4条答案
按热度按时间kxkpmulp1#
再加上wandermonk@提到的,
动态分区插入仅在sql模式下受支持(对于insert overwrite table sql语句)。对于非基于文件的数据源(即insertablerelations),不支持动态分区插入。
对于动态分区插入,overwrite关键字的行为由spark.sql.sources.partitionoverwritemode配置属性(默认值:static)控制。属性控制spark是否应删除所有与分区规范匹配的分区,而不管是否有数据要写入(静态)或仅删除将有数据写入的分区(动态)。
当启用动态覆盖模式时,spark将只删除有数据要写入的分区。所有其他分区保持完整。
从
从使用spark写入动态分区(https://medium.com/a-muggles-pensieve/writing-into-dynamic-partitions-using-spark-2e2b818a007a)
spark现在可以像hive一样编写分区数据 — 这意味着只有insert查询所触及的分区会被覆盖,其他分区不会被触及。
g2ieeal72#
因此,如果您使用的是spark版本<2.3,并且希望在不删除其他分区的情况下动态写入分区,那么可以实现以下解决方案。
其思想是将数据集注册为一个表,然后使用spark.sql()运行insert查询。
atmip9wb3#
如果您使用的是spark 2.3.0,请尝试设置
spark.sql.sources.partitionOverwriteMode
设置为dynamic
,需要对数据集进行分区,并覆盖写入模式。ljsrvy3e4#
我建议使用sparksession运行sql。您可以通过从现有数据集中选择列来运行“插入覆盖分区查询”。这个解决方案肯定只会覆盖分区。