在使用spark 2.1.0和hive 2.1.1的mapr群集上,配置单元表的架构在spark和配置单元之间不同步时遇到问题。
我需要尝试专门针对托管表解决此问题,但非托管/外部表可能会重现此问题。
步骤概述
使用 saveAsTable
将Dataframe保存到给定的表。
使用 mode("overwrite").parquet("path/to/table")
覆盖以前保存的表的数据。实际上,我是通过spark和hive外部的一个进程修改数据的,但这也产生了同样的问题。
使用 spark.catalog.refreshTable(...)
刷新元数据
使用查询表 spark.table(...).show()
. 原始Dataframe和覆盖Dataframe之间相同的任何列都将正确显示新数据,但仅在新表中的任何列都不会显示。
示例
db_name = "test_39d3ec9"
table_name = "overwrite_existing"
table_location = "<spark.sql.warehouse.dir>/{}.db/{}".format(db_name, table_name)
qualified_table = "{}.{}".format(db_name, table_name)
spark.sql("CREATE DATABASE IF NOT EXISTS {}".format(db_name))
另存为托管表
existing_df = spark.createDataFrame([(1, 2)])
existing_df.write.mode("overwrite").saveAsTable(table_name)
请注意,将以下内容保存为非托管表将产生相同的问题:
existing_df.write.mode("overwrite") \
.option("path", table_location) \
.saveAsTable(qualified_table)
查看表的内容
spark.table(table_name).show()
+---+---+
| _1| _2|
+---+---+
| 1| 2|
+---+---+
直接覆盖Parquet文件
new_df = spark.createDataFrame([(3, 4, 5, 6)], ["_4", "_3", "_2", "_1"])
new_df.write.mode("overwrite").parquet(table_location)
用Parquet读取器查看内容,内容显示正确
spark.read.parquet(table_location).show()
+---+---+---+---+
| _4| _3| _2| _1|
+---+---+---+---+
| 3| 4| 5| 6|
+---+---+---+---+
刷新表的spark元数据并作为表重新读入。将更新相同列的数据,但不显示其他列。
spark.catalog.refreshTable(qualified_table)
spark.table(qualified_table).show()
+---+---+
| _1| _2|
+---+---+
| 6| 5|
+---+---+
我还尝试在调用之前更新配置单元中的架构 spark.catalog.refreshTable
在配置单元外壳中使用以下命令:
ALTER TABLE test_39d3ec9.overwrite_existing REPLACE COLUMNS (`_1` bigint, `_2` bigint, `_3` bigint, `_4` bigint);
在运行alter命令之后,我运行descripe,它在hive中正确显示
DESCRIBE test_39d3ec9.overwrite_existing
OK
_1 bigint
_2 bigint
_3 bigint
_4 bigint
在运行alter命令之前,它只显示预期的原始列
DESCRIBE test_39d3ec9.overwrite_existing
OK
_1 bigint
_2 bigint
然后我跑了 spark.catalog.refreshTable
但这并没有影响spark对数据的看法。
附加说明
在spark方面,我使用pyspark进行了大部分测试,但也在spark shell(scala)和sparksql shell中进行了测试。在Spark壳里,我还试着用 HiveContext
但没用。
import org.apache.spark.sql.hive.HiveContext
import spark.sqlContext.implicits._
val hiveObj = new HiveContext(sc)
hiveObj.refreshTable("test_39d3ec9.overwrite_existing")
在hiveshell中执行alter命令后,我在hue中验证了模式在那里也发生了更改。
我还试着用 spark.sql("ALTER ...")
但是我们现在使用的spark版本(2.1.0)不允许使用它,而且看起来它在spark 2.2.0之前是不可用的,基于这个问题:https://issues.apache.org/jira/browse/spark-19261
我还再次阅读了spark文档,特别是本节:https://spark.apache.org/docs/2.1.0/sql-programming-guide.html#hive-metastoreParquet表转换
根据这些文件, spark.catalog.refreshTable
应该有用。的配置 spark.sql.hive.convertMetastoreParquet
通常是 false
,但我把它换成了 true
为了测试,它似乎没有任何影响。
任何帮助都将不胜感激,谢谢!
1条答案
按热度按时间67up9zun1#
在cdh5.11.x包中使用spark2.2.0时,我遇到了类似的问题。
之后
spark.write.mode("overwrite").saveAsTable()
当我发布spark.read.table().show
不会显示任何数据。在检查时,我发现这是一个已知的问题与cdhSpark2.2.0版本。解决方法是在执行saveastable命令后运行下面的命令。
如果你的table位置像hdfs://hdfsha/user/warehouse/example.db/qualified_table
然后分配'path'='hdfs://hdfsha/user/warehouse/example.db/qualified_table'
这对我有用。试试看。我想现在你的问题已经解决了。如果不行,你可以试试这个方法。
变通办法source:https://www.cloudera.com/documentation/spark2/2-2-x/topics/spark2_known_issues.html