如何使用spark对象获取配置单元表的位置值?

pxiryf3j  于 2021-06-27  发布在  Hive
关注(0)|答案(6)|浏览(371)

我感兴趣的是能够检索给定spark对象(sparksession)的配置单元表的位置值。获取此值的一种方法是通过以下sql查询解析位置的输出:

describe formatted <table name>

我想知道是否有其他方法可以获得位置值而不必解析输出。如果上述命令的输出在配置单元版本之间发生变化,那么api将非常有用。如果需要外部依赖,会是哪一种?是否有一些示例Spark代码可以获得位置值?

r7s23pms

r7s23pms1#

你也可以使用 .toDF 上的方法 desc formatted table 然后从Dataframe过滤。 DataframeAPI:scala> :paste spark.sql("desc formatted data_db.part_table") .toDF //convert to dataframe will have 3 columns col_name,data_type,comment .filter('col_name === "Location") //filter on colname .collect()(0)(1) .toStringResult: ```
String = hdfs://nn:8020/location/part_table

(or) `RDD Api:` ```
scala> :paste
spark.sql("desc formatted data_db.part_table")
.collect()
.filter(r => r(0).equals("Location")) //filter on r(0) value
.map(r => r(1)) //get only the location
.mkString //convert as string
.split("8020")(1) //change the split based on your namenode port..etc
``` `Result:` ```
String = /location/part_table
zd287kbt

zd287kbt2#

在scala项目中将此函数用作可重用函数

def getHiveTablePath(tableName: String, spark: SparkSession):String =
    {
       import org.apache.spark.sql.functions._
      val sql: String = String.format("desc formatted %s", tableName)
      val result: DataFrame = spark.sql(sql).filter(col("col_name") === "Location")
      result.show(false) // just for debug purpose
      val info: String = result.collect().mkString(",")
      val path: String = info.split(',')(1)
      path
    }

打电话的人是

println(getHiveTablePath("src", spark)) // you can prefix schema if you have

结果(我在下面的local so file:/中执行,如果它的hdfs hdfs://将出现):

+--------+------------------------------------+-------+
|col_name|data_type                           |comment|
+--------+--------------------------------------------+
|Location|file:/Users/hive/spark-warehouse/src|       |
+--------+------------------------------------+-------+

file:/Users/hive/spark-warehouse/src
cidc1ykv

cidc1ykv3#

以下是正确答案:

import org.apache.spark.sql.catalyst.TableIdentifier

lazy val tblMetadata = spark.sessionState.catalog.getTableMetadata(new TableIdentifier(tableName,Some(schema)))
kse8i1jr

kse8i1jr4#

第一种方法
你可以用 input_file_name 使用Dataframe。
它将为您提供零件文件的绝对文件路径。

spark.read.table("zen.intent_master").select(input_file_name).take(1)

然后从中提取表路径。
第二种方法
你可以说更多的是黑客。

package org.apache.spark.sql.hive

import java.net.URI

import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog}
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.internal.{SessionState, SharedState}
import org.apache.spark.sql.SparkSession

class TableDetail {
  def getTableLocation(table: String, spark: SparkSession): URI = {
    val sessionState: SessionState = spark.sessionState
    val sharedState: SharedState = spark.sharedState
    val catalog: SessionCatalog = sessionState.catalog
    val sqlParser: ParserInterface = sessionState.sqlParser
    val client = sharedState.externalCatalog match {
      case catalog: HiveExternalCatalog => catalog.client
      case _: InMemoryCatalog => throw new IllegalArgumentException("In Memory catalog doesn't " +
        "support hive client API")
    }

    val idtfr = sqlParser.parseTableIdentifier(table)

    require(catalog.tableExists(idtfr), new IllegalArgumentException(idtfr + " done not exists"))
    val rawTable = client.getTable(idtfr.database.getOrElse("default"), idtfr.table)
    rawTable.location
  }
}
nxagd54h

nxagd54h5#

以下是如何在pyspark中执行此操作:

(spark.sql("desc formatted mydb.myschema")
       .filter("col_name=='Location'")
       .collect()[0].data_type)
wkyowqbh

wkyowqbh6#

使用externalcatalog

scala> spark
res15: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@4eba6e1f

scala> val metastore = spark.sharedState.externalCatalog
metastore: org.apache.spark.sql.catalyst.catalog.ExternalCatalog = org.apache.spark.sql.hive.HiveExternalCatalog@24b05292

scala> val location = metastore.getTable("meta_data", "mock").location
location: java.net.URI = hdfs://10.1.5.9:4007/usr/hive/warehouse/meta_data.db/mock

相关问题