如何使用spark截断数据并从配置单元表中删除所有分区

c90pui9n  于 2021-06-24  发布在  Hive
关注(0)|答案(2)|浏览(407)

如何删除所有数据并从数据库中删除所有分区 Hive 表,使用 Spark 2.3.0 ```
truncate table my_table; // Deletes all data, but keeps partitions in metastore

alter table my_table drop partition(p_col > 0) // does not work from spark

唯一对我有效的方法就是迭代 `show partitions my_table` ,替换 `/` 由 `,` 并逐个删除每个分区。但一定有更干净的方法。如果分区列的类型是 `string` . 有什么建议吗?
fnvucqvd

fnvucqvd1#

配置单元有两种类型的表(托管表和外部表)。托管表是为配置单元管理整个架构和数据而创建的。因此,删除配置单元管理的表会删除模式、元数据和数据。但是,外部表的数据位于其他位置(例如s3这样的外部源)。因此,删除表只会删除元数据和表,但数据在源中保持不变。
在您的例子中,当您截断表时,配置单元应该维护元存储,因为表仍然存在于配置单元中,只有数据被删除。此外,metastore不保存数据,因为它只包含有关模式和其他相关表细节的信息。
我希望它能在某种程度上解决这个问题。
编辑1:
类似职位

lskq00tm

lskq00tm2#

让我们使用spark 2.4.3设置问题:

// We create the table
spark.sql("CREATE TABLE IF NOT EXISTS potato (size INT) PARTITIONED BY (hour STRING)")

// Enable dynamic partitioning 
spark.conf.set("hive.exec.dynamic.partition.mode","nonstrict")

// Insert some dummy records
(1 to 9).map(i => spark.sql(s"INSERT INTO potato VALUES ($i, '2020-06-07T0$i')"))

// Verify inserts
spark.table("potato").count // 9 records

我们使用外部目录的 listPartitions 以及 dropPartitions 功能。

// Get External Catalog
val catalog = spark.sharedState.externalCatalog

// Get the spec from the list of all partitions 
val partitions = catalog.listPartitions("default", "potato").map(_.spec)

// We pass them to the Catalog's dropPartitions function.
// If you purge data, it gets deleted immediately and isn't moved to trash.
// This takes precedence over retainData, so even if you retainData but purge,
// your data is gone.
catalog.dropPartitions("default", "potato", partitions,
                   ignoreIfNotExists=true, purge=true, retainData=false)
spark.table("potato").count // 0 records
catalog.listPartitions("default", "potato").length // 0 partitions

这一切都很好,有利于 MANAGED table,但是table呢 EXTERNAL table?

// We repeat the setup above but after creating an EXTERNAL table
// After dropping we see that the partitions appear to be gone (or are they?).
catalog.listPartitions("default", "potato").length // 0 partitions

// BUT repairing the table simply adds them again, the partitions/data 
// were NOT deleted from the underlying filesystem. This is not what we wanted!
spark.sql("MSCK REPAIR TABLE potato")
catalog.listPartitions("default", "potato").length // 9 partitions again!

为了解决这个问题,我们把table改成 EXTERNALMANAGED 在放下分区之前。

import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.catalog.CatalogTableType

// Identify the table in question
val identifier = TableIdentifier("potato", Some("default"))

// Get its current metadata
val tableMetadata = catalog.getTableMetadata(identifier)

// Clone the metadata while changing the tableType to MANAGED
val alteredMetadata = tableMetadata.copy(tableType = CatalogTableType.MANAGED)

// Alter the table using the new metadata
catalog.alterTable(alteredMetadata)

// Now drop!
catalog.dropPartitions("default", "potato", partitions,
                   ignoreIfNotExists=true, purge=true, retainData=false)
spark.table("potato").count // 0 records
catalog.listPartitions("default", "potato").length // 0 partitions
spark.sql("MSCK REPAIR TABLE potato") // Won't add anything
catalog.listPartitions("default", "potato").length // Still 0 partitions!

别忘了把table换回原处 EXTERNAL 使用 CatalogTableType.EXTERNAL .

相关问题