获取spark写入hive metastore的所有新分区

fykwrbwg  于 2021-06-24  发布在  Hive
关注(0)|答案(2)|浏览(805)

我有一个Dataframe,我正在使用它插入到一个使用sparksql(使用动态分区)的现有分区配置单元表中。一旦dataframe被写入,我想知道dataframe刚刚在hive中创建的分区是什么。
我可以在dataframe中查询不同的分区,但这需要很长时间,因为它必须启动dataframe的整个沿袭。
我可以在写入配置单元之前保存Dataframe,这样,写入操作和disctinct partition\u column操作就发生在缓存的Dataframe之上。但是我的Dataframe非常大,不想花更多的时间在持久化上。
我知道所有的分区信息都存储在hivemetastore中。spark中是否有任何元存储API可以帮助仅检索已创建的新分区?

cld4siwp

cld4siwp1#

可以使用hivemetastoreclient检索表的分区数据:

import org.apache.hadoop.hive.conf.HiveConf
import scala.collection.JavaConverters._
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient

val hiveConf = new HiveConf(spark.sparkContext.hadoopConfiguration, classOf[HiveConf])
val cli = new HiveMetaStoreClient(hiveConf)

/* Get list of partition values prior to DF insert */
val existingPartitions = cli.listPartitions("<db_name>", "<tbl_name>", Short.MaxValue).asScala.map(_.getValues.asScala.mkString(","))
/* Insert DF contents to table */
df.write.insertInto("<db_name>.<tbl_name>")
/* Fetch list of partition values again, and diff with previous list */
val newPartitions = cli.listPartitions("<db_name>", "<tbl_name>", Short.MaxValue).asScala.map(_.getValues.asScala.mkString(","))
val deltaPartitions = newPartitions.diff(existingPartitions)
wfauudbj

wfauudbj2#

val epochTime = <epoch time before inserting the dataframe>
val partitionName = <Partition Column Name>
df.write.insertInto("<db_name>.<tbl_name>")
val catalogPartitions = spark.sharedState.externalCatalog.listPartitions("<db_name>", "<tbl_name>")
val partitionValues = catalogPartitions.filter(cp => ((cp.parameters.get("transient_lastDdlTime").isDefined && cp.parameters.getOrElse("transient_lastDdlTime", "0").toLong >= epochTime / 1000) || cp.lastAccessTime >= epochTime || cp.createTime >= epochTime) && cp.spec.contains(datePartition)).map(cp => cp.spec.getOrElse(datePartition, "")
      }).toList
processedPartitions = partitionValues.toList

在大多数情况下,lastaccesstime为0。createtime具有创建分隔符的时间。但是在parameters中,我发现了一个新的param.u lastdltime,它包含分区的更新时间戳。更安全的一点是,检查所有三个分区,以获得在给定的epoch时间之后创建或修改的分区。

相关问题