使用parquet文件元数据创建配置单元表

brqmpdu1  于 2022-11-05  发布在  Hive
关注(0)|答案(6)|浏览(248)

我写了一个DataFrame作为parquet文件。而且,我想使用Hive读取文件,使用parquet的元数据。
编写parquet write的输出

_common_metadata  part-r-00000-0def6ca1-0f54-4c53-b402-662944aa0be9.gz.parquet  part-r-00002-0def6ca1-0f54-4c53-b402-662944aa0be9.gz.parquet  _SUCCESS
_metadata         part-r-00001-0def6ca1-0f54-4c53-b402-662944aa0be9.gz.parquet  part-r-00003-0def6ca1-0f54-4c53-b402-662944aa0be9.gz.parquet

配置单元表

CREATE  TABLE testhive
ROW FORMAT SERDE
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  '/home/gz_files/result';

FAILED: SemanticException [Error 10043]: Either list of columns or a custom serializer should be specified

我如何推断从 parquet 文件的 meta数据?
如果我打开_common_metadata我下面有内容,

PAR1LHroot
%TSN%
%TS%
%Etype%
)org.apache.spark.sql.parquet.row.metadata▒{"type":"struct","fields":[{"name":"TSN","type":"string","nullable":true,"metadata":{}},{"name":"TS","type":"string","nullable":true,"metadata":{}},{"name":"Etype","type":"string","nullable":true,"metadata":{}}]}

或者如何解析 meta数据文件?

xxls0lw8

xxls0lw81#

下面是我提出的一个解决方案,用于从 parquet 文件中获取元数据,以便创建一个Hive表。
首先启动一个spark-shell(或者将其全部编译到一个Jar中,然后使用spark-submit运行它,但是shell要简单得多)

import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.DataFrame

val df=sqlContext.parquetFile("/path/to/_common_metadata")

def creatingTableDDL(tableName:String, df:DataFrame): String={
  val cols = df.dtypes
  var ddl1 = "CREATE EXTERNAL TABLE "+tableName + " ("
  //looks at the datatypes and columns names and puts them into a string
  val colCreate = (for (c <-cols) yield(c._1+" "+c._2.replace("Type",""))).mkString(", ")
  ddl1 += colCreate + ") STORED AS PARQUET LOCATION '/wherever/you/store/the/data/'"
  ddl1
}

val test_tableDDL=creatingTableDDL("test_table",df,"test_db")

它将为您提供配置单元将用于每列的数据类型,因为它们存储在Parquet中。例如:CREATE EXTERNAL TABLE test_table (COL1 Decimal(38,10), COL2 String, COL3 Timestamp) STORED AS PARQUET LOCATION '/path/to/parquet/files'

unftdfkk

unftdfkk2#

我只想扩展一下James Tobin的答案,有一个StructField类,它提供了Hive的数据类型,而不需要进行字符串替换。

// Tested on Spark 1.6.0.

import org.apache.spark.sql.DataFrame

def dataFrameToDDL(dataFrame: DataFrame, tableName: String): String = {
    val columns = dataFrame.schema.map { field =>
        "  " + field.name + " " + field.dataType.simpleString.toUpperCase
    }

    s"CREATE TABLE $tableName (\n${columns.mkString(",\n")}\n)"
}

这样就解决了IntegerType问题。

scala> val dataFrame = sc.parallelize(Seq((1, "a"), (2, "b"))).toDF("x", "y")
dataFrame: org.apache.spark.sql.DataFrame = [x: int, y: string]

scala> print(dataFrameToDDL(dataFrame, "t"))
CREATE TABLE t (
  x INT,
  y STRING
)

这应该适用于任何DataFrame,而不仅仅适用于Parquet。(例如,我将其用于JDBC DataFrame。)
另外,如果目标DDL支持可为空的列,则可以通过选中StructField.nullable来扩展该函数。

toiithl6

toiithl63#

对维克托进行了小的改进(在www.example.com上添加了引用field.name),并修改为将table绑定到本地 parquet 文件(在spark 1.6.1上测试)

def dataFrameToDDL(dataFrame: DataFrame, tableName: String, absFilePath: String): String = {
    val columns = dataFrame.schema.map { field =>
      "  `" + field.name + "` " + field.dataType.simpleString.toUpperCase
    }
    s"CREATE EXTERNAL TABLE $tableName (\n${columns.mkString(",\n")}\n) STORED AS PARQUET LOCATION '"+absFilePath+"'"
  }

还要注意:

  • 由于SQLContext不支持创建外部表,因此需要HiveContext。
  • parquet 文件夹的路径必须是绝对路径
eqfvzcg8

eqfvzcg84#

我想扩大詹姆斯回答,
下面的代码适用于所有数据类型,包括ARRAY、MAP和STRUCT。
已在SPARK 2.2中测试

val df=sqlContext.parquetFile("parquetFilePath")
val schema = df.schema
var columns = schema.fields
var ddl1 = "CREATE EXTERNAL TABLE " tableName + " ("
val cols=(for(column <- columns) yield column.name+" "+column.dataType.sql).mkString(",")
ddl1=ddl1+cols+" ) STORED AS PARQUET LOCATION '/tmp/hive_test1/'"
spark.sql(ddl1)
pes8fvy9

pes8fvy95#

我也有同样的问题。不过从实际Angular 来看可能很难实现,因为Parquet支持模式演化:
http://www.cloudera.com/content/www/en-us/documentation/archive/impala/2-x/2-0-x/topics/impala_parquet.html#parquet_schema_evolution_unique_1
例如,您可以向表中添加一个新列,而不必接触表中已有的数据。只有新的数据文件才会有新的元数据(与以前的版本兼容)。
从Spark 1.5.0开始,默认情况下模式合并是关闭的,因为这是一个“相对昂贵的操作”http://spark.apache.org/docs/latest/sql-programming-guide.html#schema-merging所以推断最新的模式可能并不像听起来那么简单。

$ parquet-tools schema /home/gz_files/result/000000_0
4zcjmb1e

4zcjmb1e6#

其实 Impala 支持

CREATE TABLE LIKE PARQUET

(no共列截面):
https://docs.cloudera.com/runtime/7.2.15/impala-sql-reference/topics/impala-create-table.html
你的问题的标签有“hive”和“spark”,我没有看到这是在Hive中实现的,但如果你使用CDH,它可能是你正在寻找的。

相关问题