外部覆盖后spark和配置单元表架构不同步

2g32fytz  于 2021-06-26  发布在  Hive
关注(0)|答案(1)|浏览(455)

在使用spark 2.1.0和hive 2.1.1的mapr群集上,配置单元表的架构在spark和配置单元之间不同步时遇到问题。
我需要尝试专门针对托管表解决此问题,但非托管/外部表可能会重现此问题。

步骤概述

使用 saveAsTable 将Dataframe保存到给定的表。
使用 mode("overwrite").parquet("path/to/table") 覆盖以前保存的表的数据。实际上,我是通过spark和hive外部的一个进程修改数据的,但这也产生了同样的问题。
使用 spark.catalog.refreshTable(...) 刷新元数据
使用查询表 spark.table(...).show() . 原始Dataframe和覆盖Dataframe之间相同的任何列都将正确显示新数据,但仅在新表中的任何列都不会显示。

示例

db_name = "test_39d3ec9"
table_name = "overwrite_existing"
table_location = "<spark.sql.warehouse.dir>/{}.db/{}".format(db_name, table_name)

qualified_table = "{}.{}".format(db_name, table_name)
spark.sql("CREATE DATABASE IF NOT EXISTS {}".format(db_name))

另存为托管表

existing_df = spark.createDataFrame([(1, 2)])
existing_df.write.mode("overwrite").saveAsTable(table_name)

请注意,将以下内容保存为非托管表将产生相同的问题:

existing_df.write.mode("overwrite") \
    .option("path", table_location) \
    .saveAsTable(qualified_table)

查看表的内容

spark.table(table_name).show()
+---+---+
| _1| _2|
+---+---+
|  1|  2|
+---+---+

直接覆盖Parquet文件

new_df = spark.createDataFrame([(3, 4, 5, 6)], ["_4", "_3", "_2", "_1"])
new_df.write.mode("overwrite").parquet(table_location)

用Parquet读取器查看内容,内容显示正确

spark.read.parquet(table_location).show()
+---+---+---+---+
| _4| _3| _2| _1|
+---+---+---+---+
|  3|  4|  5|  6|
+---+---+---+---+

刷新表的spark元数据并作为表重新读入。将更新相同列的数据,但不显示其他列。

spark.catalog.refreshTable(qualified_table)
spark.table(qualified_table).show()
+---+---+
| _1| _2|
+---+---+
|  6|  5|
+---+---+

我还尝试在调用之前更新配置单元中的架构 spark.catalog.refreshTable 在配置单元外壳中使用以下命令:

ALTER TABLE test_39d3ec9.overwrite_existing REPLACE COLUMNS (`_1` bigint, `_2` bigint, `_3` bigint, `_4` bigint);

在运行alter命令之后,我运行descripe,它在hive中正确显示

DESCRIBE test_39d3ec9.overwrite_existing
OK
_1                      bigint
_2                      bigint
_3                      bigint
_4                      bigint

在运行alter命令之前,它只显示预期的原始列

DESCRIBE test_39d3ec9.overwrite_existing
OK
_1                      bigint
_2                      bigint

然后我跑了 spark.catalog.refreshTable 但这并没有影响spark对数据的看法。

附加说明

在spark方面,我使用pyspark进行了大部分测试,但也在spark shell(scala)和sparksql shell中进行了测试。在Spark壳里,我还试着用 HiveContext 但没用。

import org.apache.spark.sql.hive.HiveContext
import spark.sqlContext.implicits._
val hiveObj = new HiveContext(sc)
hiveObj.refreshTable("test_39d3ec9.overwrite_existing")

在hiveshell中执行alter命令后,我在hue中验证了模式在那里也发生了更改。
我还试着用 spark.sql("ALTER ...") 但是我们现在使用的spark版本(2.1.0)不允许使用它,而且看起来它在spark 2.2.0之前是不可用的,基于这个问题:https://issues.apache.org/jira/browse/spark-19261
我还再次阅读了spark文档,特别是本节:https://spark.apache.org/docs/2.1.0/sql-programming-guide.html#hive-metastoreParquet表转换
根据这些文件, spark.catalog.refreshTable 应该有用。的配置 spark.sql.hive.convertMetastoreParquet 通常是 false ,但我把它换成了 true 为了测试,它似乎没有任何影响。
任何帮助都将不胜感激,谢谢!

67up9zun

67up9zun1#

在cdh5.11.x包中使用spark2.2.0时,我遇到了类似的问题。
之后 spark.write.mode("overwrite").saveAsTable() 当我发布 spark.read.table().show 不会显示任何数据。
在检查时,我发现这是一个已知的问题与cdhSpark2.2.0版本。解决方法是在执行saveastable命令后运行下面的命令。

spark.sql("ALTER TABLE qualified_table set SERDEPROPERTIES ('path'='hdfs://{hdfs_host_name}/{table_path}')")

spark.catalog.refreshTable("qualified_table")

如果你的table位置像hdfs://hdfsha/user/warehouse/example.db/qualified_table
然后分配'path'='hdfs://hdfsha/user/warehouse/example.db/qualified_table'
这对我有用。试试看。我想现在你的问题已经解决了。如果不行,你可以试试这个方法。
变通办法source:https://www.cloudera.com/documentation/spark2/2-2-x/topics/spark2_known_issues.html

相关问题