将spark dataframe保存为配置单元中的动态分区表

wlp8pajw  于 2021-05-29  发布在  Hadoop
关注(0)|答案(6)|浏览(555)

我有一个示例应用程序,用于将csv文件读入Dataframe。可以使用该方法将Dataframe以Parquet格式存储到配置单元表中 df.saveAsTable(tablename,mode) .
上面的代码工作得很好,但是我每天都有太多的数据,所以我想基于creationdate(表中的列)动态划分hive表。
是否有任何方法可以动态划分Dataframe并将其存储到配置单元仓库中。要避免使用硬编码insert语句吗 hivesqlcontext.sql(insert into table partittioin by(date)....) .
这个问题可以看作是:如何将Dataframe直接保存到配置单元?
非常感谢您的帮助。

ddrv8njm

ddrv8njm1#

我也面临同样的问题,但使用以下技巧我解决了。
当我们对任何表进行分区时,分区列就变得区分大小写。
分区列应以相同的名称出现在Dataframe中(区分大小写)。代码:

var dbName="your database name"
var finaltable="your table name"

// First check if table is available or not..
if (sparkSession.sql("show tables in " + dbName).filter("tableName='" +finaltable + "'").collect().length == 0) {
     //If table is not available then it will create for you..
     println("Table Not Present \n  Creating table " + finaltable)
     sparkSession.sql("use Database_Name")
     sparkSession.sql("SET hive.exec.dynamic.partition = true")
     sparkSession.sql("SET hive.exec.dynamic.partition.mode = nonstrict ")
     sparkSession.sql("SET hive.exec.max.dynamic.partitions.pernode = 400")
     sparkSession.sql("create table " + dbName +"." + finaltable + "(EMP_ID        string,EMP_Name          string,EMP_Address               string,EMP_Salary    bigint)  PARTITIONED BY (EMP_DEP STRING)")
     //Table is created now insert the DataFrame in append Mode
     df.write.mode(SaveMode.Append).insertInto(empDB + "." + finaltable)
}
sqxo8psd

sqxo8psd2#

我可以使用 df.write().mode(SaveMode.Append).partitionBy("colname").saveAsTable("Table") 我必须启用以下属性才能使其工作。

hiveContext.setConf("hive.exec.dynamic.partition", "true")
hiveContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
lp0sw83n

lp0sw83n3#

这对我使用python和spark2.1.0是有效的。
不知道这是不是最好的方法,但它是有效的。。。


# WRITE DATA INTO A HIVE TABLE

import pyspark
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .master("local[*]") \
    .config("hive.exec.dynamic.partition", "true") \
    .config("hive.exec.dynamic.partition.mode", "nonstrict") \
    .enableHiveSupport() \
    .getOrCreate()

### CREATE HIVE TABLE (with one row)

spark.sql("""
CREATE TABLE IF NOT EXISTS hive_df (col1 INT, col2 STRING, partition_bin INT)
USING HIVE OPTIONS(fileFormat 'PARQUET')
PARTITIONED BY (partition_bin)
LOCATION 'hive_df'
""")
spark.sql("""
INSERT INTO hive_df PARTITION (partition_bin = 0)
VALUES (0, 'init_record')
""")

### 

### CREATE NON HIVE TABLE (with one row)

spark.sql("""
CREATE TABLE IF NOT EXISTS non_hive_df (col1 INT, col2 STRING, partition_bin INT)
USING PARQUET
PARTITIONED BY (partition_bin)
LOCATION 'non_hive_df'
""")
spark.sql("""
INSERT INTO non_hive_df PARTITION (partition_bin = 0)
VALUES (0, 'init_record')
""")

### 

###  ATTEMPT DYNAMIC OVERWRITE WITH EACH TABLE

spark.sql("""
INSERT OVERWRITE TABLE hive_df PARTITION (partition_bin)
VALUES (0, 'new_record', 1)
""")
spark.sql("""
INSERT OVERWRITE TABLE non_hive_df PARTITION (partition_bin)
VALUES (0, 'new_record', 1)
""")

spark.sql("SELECT * FROM hive_df").show() # 2 row dynamic overwrite
spark.sql("SELECT * FROM non_hive_df").show() # 1 row full table overwrite
nr9pn0ug

nr9pn0ug4#

可以在上配置 SparkSession 这样:

spark = SparkSession \
    .builder \
    ...
    .config("spark.hadoop.hive.exec.dynamic.partition", "true") \
    .config("spark.hadoop.hive.exec.dynamic.partition.mode", "nonstrict") \
    .enableHiveSupport() \
    .getOrCreate()

或者可以将它们添加到.properties文件中
这个 spark.hadoop spark配置需要前缀(至少在2.4中是这样),下面是spark如何设置此配置:

/**
   * Appends spark.hadoop.* configurations from a [[SparkConf]] to a Hadoop
   * configuration without the spark.hadoop. prefix.
   */
  def appendSparkHadoopConfigs(conf: SparkConf, hadoopConf: Configuration): Unit = {
    SparkHadoopUtil.appendSparkHadoopConfigs(conf, hadoopConf)
  }
3hvapo4f

3hvapo4f5#

这就是我的工作。我设置这些设置,然后将数据放在分区表中。

from pyspark.sql import HiveContext
sqlContext = HiveContext(sc)
sqlContext.setConf("hive.exec.dynamic.partition", "true")
sqlContext.setConf("hive.exec.dynamic.partition.mode", 
"nonstrict")
6yjfywim

6yjfywim6#

我相信它是这样工作的: df 是包含年、月和其他列的Dataframe

df.write.partitionBy('year', 'month').saveAsTable(...)

df.write.partitionBy('year', 'month').insertInto(...)

相关问题