在spark 2.0中,您可以轻松地从hive数据仓库读取数据,也可以向hive表中写入和追加数据。
本文将使用python编程语言实现:
从现有的hive表中创建DataFrame
将dataframe保存到一个新的hive表
通过insert语句和append write模式向现有的hive表追加数据。
首先,创建一个支持hive的sparksession,运行以下代码创建一个支持hive的spark会话:
from pyspark.sql import SparkSession
appName = "PySpark Hive Example"
master = "local"
# Create Spark session with Hive supported.
spark = SparkSession.builder \
.appName(appName) \
.master(master) \
.enableHiveSupport() \
.getOrCreate()
然后从hive读取数据,现在我们可以使用sparksession对象从hive数据库读取数据:
# Read data from Hive database test_db, table name: test_table.
df = spark.sql("select * from test_db.test_table")
df.show()
我使用derby作为hive metastore,并且我已经在test_db数据库上创建了一个名为test_table的表。在表内部,有两条记录。
结果如下所示:
+---+-----+
| id|value|
+---+-----+
| 1| ABC|
| 2| DEF|
+---+-----+
添加一个新列
# Let's add a new column
df = df.withColumn("NewColumn",lit('Test'))
df.show()
结果如下:
+---+-----+---------+
| id|value|NewColumn|
+---+-----+---------+
| 1| ABC| Test|
| 2| DEF| Test|
+---+-----+---------+
将Dataframe保存为一个新的hive表
使用下面的代码将Dataframe保存到一个新的名为test_table2的hive表中:
# Save df to a new table in Hive
df.write.mode("overwrite").saveAsTable("test_db.test_table2")
# Show the results using SELECT
spark.sql("select * from test_db.test_table2").show()
在日志中,我可以看到新表默认保存为parquet:
使用catalyst模式初始化拼花writesupport:
{
"type" : "struct",
"fields" : [ {
"name" : "id",
"type" : "long",
"nullable" : true,
"metadata" : { }
}, {
"name" : "value",
"type" : "string",
"nullable" : true,
"metadata" : {
"HIVE_TYPE_STRING" : "varchar(100)"
}
}, {
"name" : "NewColumn",
"type" : "string",
"nullable" : false,
"metadata" : { }
} ]
}
对应的拼花消息类型:
message spark_schema {
optional int64 id;
optional binary value (UTF8);
required binary NewColumn (UTF8);
}
将数据追加到现有的hive表
您还可以通过“insert sql statement”或“append”写入模式向现有的hive表追加数据。
# Append data via SQL
spark.sql("insert into test_db.test_table2 values (3, 'GHI', 'SQL INSERT')")
spark.sql("select * from test_db.test_table2").show()
# Append data via code
df = spark.sql("select 4 as id, 'JKL' as value, 'Spark Write Append Mode' as NewColumn")
df.show()
df.write.mode("append").saveAsTable("test_db.test_table2")
spark.sql("select * from test_db.test_table2").show()
这两条记录都被成功插入到表中,如下面的输出所示:
+---+-----+--------------------+
| id|value| NewColumn|
+---+-----+--------------------+
| 4| JKL|Spark Write Appen...|
| 1| ABC| Test|
| 2| DEF| Test|
| 3| GHI| SQL INSERT|
+---+-----+--------------------+
完整的代码如下: example.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
appName = "PySpark Hive Example"
master = "local"
# Create Spark session with Hive supported.
spark = SparkSession.builder \
.appName(appName) \
.master(master) \
.enableHiveSupport() \
.getOrCreate()
# Read data from Hive database test_db, table name: test_table.
df = spark.sql("select * from test_db.test_table")
df.show()
# Let's add a new column
df = df.withColumn("NewColumn",lit('Test'))
df.show()
# Save df to a new table in Hive
df.write.mode("overwrite").saveAsTable("test_db.test_table2")
# Show the results using SELECT
spark.sql("select * from test_db.test_table2").show()
# Append data via SQL
spark.sql("insert into test_db.test_table2 values (3, 'GHI', 'SQL INSERT')")
spark.sql("select * from test_db.test_table2").show()
# Append data via code
df = spark.sql("select 4 as id, 'JKL' as value, 'Spark Write Append Mode' as NewColumn")
df.show()
df.write.mode("append").saveAsTable("test_db.test_table2")
spark.sql("select * from test_db.test_table2").show()
在spark玩得开心!
内容来源于网络,如有侵权,请联系作者删除!