使用spark覆盖hive分区

kd3sttzy  于 2021-05-31  发布在  Hadoop
关注(0)|答案(4)|浏览(459)

我与aws合作,我有使用spark和hive的工作流。我的数据是按日期划分的,所以每天我都在s3存储器中有一个新的分区。我的问题是,有一天加载数据失败,我必须重新执行该分区。接下来是编写的代码:

df                            // My data in a Dataframe
  .write
  .format(getFormat(target))  // csv by default, but could be parquet, ORC...
  .mode(getSaveMode("overwrite"))  // Append by default, but in future it should be Overwrite
  .partitionBy(partitionName) // Column of the partition, the date
  .options(target.options)    // header, separator...
  .option("path", target.path) // the path where it will be storage
  .saveAsTable(target.tableName)  // the table name

在我的流程中发生了什么?如果使用savemode.overwrite,则整个表将被删除,并且只保存分区。如果我使用savemode.append,我可能会有重复的数据。
搜索时,我发现hive支持这种覆盖,只支持分区,但是使用hql语句,我不支持´我没有。
我们需要解决Hive的问题,这样我们才能´t使用此替代选项(直接到csv)。
我找到了这张jira的票,它可以解决我所遇到的问题´我有,但在spark的最后一个版本(2.3.0)中,情况是一样的。它删除整个表并保存分区,而不是覆盖我的数据所在的分区。
为了更清楚地说明这一点,这是一个例子:
由一个
数据:

| A | B | C | 
|---|---|---| 
| b | 1 | 2 | 
| c | 1 | 2 |

表格:

| A | B | C | 
|---|---|---| 
| a | 1 | 2 | 
| b | 5 | 2 |

我想要的是:在表中,分区 a 待在table上,分区 b 用数据覆盖,并添加分区 c . 有没有什么办法可以用spark来解决这个问题?
我的最后一个选择是首先删除要保存的分区,然后使用savemode.append,但如果没有其他解决方案,我会尝试这样做。

kxkpmulp

kxkpmulp1#

再加上wandermonk@提到的,
动态分区插入仅在sql模式下受支持(对于insert overwrite table sql语句)。对于非基于文件的数据源(即insertablerelations),不支持动态分区插入。
对于动态分区插入,overwrite关键字的行为由spark.sql.sources.partitionoverwritemode配置属性(默认值:static)控制。属性控制spark是否应删除所有与分区规范匹配的分区,而不管是否有数据要写入(静态)或仅删除将有数据写入的分区(动态)。
当启用动态覆盖模式时,spark将只删除有数据要写入的分区。所有其他分区保持完整。

从使用spark写入动态分区(https://medium.com/a-muggles-pensieve/writing-into-dynamic-partitions-using-spark-2e2b818a007a)
spark现在可以像hive一样编写分区数据 — 这意味着只有insert查询所触及的分区会被覆盖,其他分区不会被触及。

g2ieeal7

g2ieeal72#

因此,如果您使用的是spark版本<2.3,并且希望在不删除其他分区的情况下动态写入分区,那么可以实现以下解决方案。
其思想是将数据集注册为一个表,然后使用spark.sql()运行insert查询。

// Create SparkSession with Hive dynamic partitioning enabled
val spark: SparkSession =
    SparkSession
        .builder()
        .appName("StatsAnalyzer")
        .enableHiveSupport()
        .config("hive.exec.dynamic.partition", "true")
        .config("hive.exec.dynamic.partition.mode", "nonstrict")
        .getOrCreate()
// Register the dataframe as a Hive table
impressionsDF.createOrReplaceTempView("impressions_dataframe")
// Create the output Hive table
spark.sql(
    s"""
      |CREATE EXTERNAL TABLE stats (
      |   ad            STRING,
      |   impressions   INT,
      |   clicks        INT
      |) PARTITIONED BY (country STRING, year INT, month INT, day INT)
      |ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n'
    """.stripMargin
)
// Write the data into disk as Hive partitions
spark.sql(
    s"""
      |INSERT OVERWRITE TABLE stats 
      |PARTITION(country = 'US', year = 2017, month = 3, day)
      |SELECT ad, SUM(impressions), SUM(clicks), day
      |FROM impressions_dataframe
      |GROUP BY ad
    """.stripMargin
)
atmip9wb

atmip9wb3#

如果您使用的是spark 2.3.0,请尝试设置 spark.sql.sources.partitionOverwriteMode 设置为 dynamic ,需要对数据集进行分区,并覆盖写入模式。

spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
data.write.mode("overwrite").insertInto("partitioned_table")
ljsrvy3e

ljsrvy3e4#

我建议使用sparksession运行sql。您可以通过从现有数据集中选择列来运行“插入覆盖分区查询”。这个解决方案肯定只会覆盖分区。

相关问题