从spark查询Parquet地板表上的配置单元时十进制值已损坏

gpnt7bae  于 2021-06-24  发布在  Hive
关注(0)|答案(1)|浏览(383)

使用sparksql在spark代码中查询parquet上的外部配置单元表时,spark返回了decimal字段的垃圾值/错误值。
在我的应用程序流中,spark进程最初将这些Parquet文件中的数据直接写入存在外部配置单元表的hdfs中。当第二个spark进程试图使用sparksql从配置单元表中使用数据时,获取的数据不正确。
场景步骤:这是一个简单的演示,再现问题:
写入parquet:我正在hdfs中将数据写入parquet文件,spark本身假设十进制字段的精度为 Decimal(28,26) .

scala> val df = spark.sql("select 'dummy' as name, 10.70000000000000000000000000 as value")
df: org.apache.spark.sql.DataFrame = [name: string, value: decimal(28,26)]
scala> df.schema
res0: org.apache.spark.sql.types.StructType = StructType(StructField(name,StringType,false), StructField(value,DecimalType(28,26),false))
scala> df.show
+-----+--------------------+
| name|               value|
+-----+--------------------+
|dummy|10.70000000000000...|
+-----+--------------------+
scala> df.write.option("overwrite",true).parquet("/my/hdfs/location/test")

读取Parquet文件:查看值是否正确写入。

scala> val df_parq = spark.read.option("spark.sql.decimalOperations.allowPrecisionLoss",false).parquet("/tenants/gwm/morph/test/tablePrecisionTest/test")
df_parq: org.apache.spark.sql.DataFrame = [name: string, value: decimal(28,26)]
scala> df_parq.show
+-------+--------------------+
|   name|               value|
+-------+--------------------+
|  dummy|10.70000000000000...|
+-------+--------------------+

创建外部配置单元表:位于Parquet地板位置的顶部,小数字段为 Decimal(18,6) .

hive> create external table db1.test_precision(name string, value Decimal(18,6)) STORED As PARQUET LOCATION '/my/hdfs/location/test';

以直线运行配置单元查询:以验证返回的数据是否正确。

hive> select * from db1.test_precision;
+----------------------+-----------------------+--+
| test_precision.name  | test_precision.value  |
+----------------------+-----------------------+--+
| dummy                | 10.7                  |
+----------------------+-----------------------+--+

使用sparksql运行相同的查询:生成了不正确的十进制值。

scala> val df_hive = spark.sql("select * from db1.test_precision")
df_hive: org.apache.spark.sql.DataFrame = [name: string, value: decimal(18,6)]
scala> df_hive.show
+-----+-----------+
| name|      value|
+-----+-----------+
|dummy|-301.989888|
+-----+-----------+

注意-我知道用显式 cast(value as Decima(18,6)) 在第一步可以解决这个问题,但我已经有历史数据,我不能立即重新加载。
在读取第5步的值时,有没有办法解决这个问题?

vvppvyoh

vvppvyoh1#

除了第三步,我完全复制了你的例子。为decimal类型创建表时,应保持精度和比例。
在您的例子中,您创建了一个十进制数(28,26)

df: org.apache.spark.sql.DataFrame = [name: string, value: decimal(28,26)]

因此,您应该为decimal类型创建一个具有相同精度和比例的表。

hive> CREATE EXTERNAL TABLE test.test_precision(name string, value Decimal(28,26)) STORED AS PARQUET LOCATION 'hdfs://quickstart.cloudera:8020/user/cloudera/test_decimal';
/**AND NOT**/
hive> create external table db1.test_precision(name string, value Decimal(18,6)) STORED As PARQUET LOCATION '/my/hdfs/location/test';
scala> val df = spark.sql("select 'dummy' as name, 10.70000000000000000000000000 as value")
df: org.apache.spark.sql.DataFrame = [name: string, value: decimal(28,26)]

scala> df.show()
+-----+--------------------+
| name|               value|
+-----+--------------------+
|dummy|10.70000000000000...|
+-----+--------------------+

scala> df.printSchema()
root
 |-- name: string (nullable = false)
 |-- value: decimal(28,26) (nullable = false)

scala> df.write.option("overwrite",true).parquet("hdfs://quickstart.cloudera:8020/user/cloudera/test_decimal")

scala> val df_parq = spark.read.option("spark.sql.decimalOperations.allowPrecisionLoss",false).parquet("hdfs://quickstart.cloudera:8020/user/cloudera/test_decimal")
df_parq: org.apache.spark.sql.DataFrame = [name: string, value: decimal(28,26)]

scala> df_parq.printSchema
root
 |-- name: string (nullable = true)
 |-- value: decimal(28,26) (nullable = true)

scala> df_parq.show
+-----+--------------------+
| name|               value|
+-----+--------------------+
|dummy|10.70000000000000...|
+-----+--------------------+

hive> CREATE EXTERNAL TABLE test.test_precision(name string, value Decimal(28,26)) STORED AS PARQUET LOCATION 'hdfs://quickstart.cloudera:8020/user/cloudera/test_decimal';

hive> select * from test_precision;

+----------------------+-----------------------+--+
| test_precision.name  | test_precision.value  |
+----------------------+-----------------------+--+
| dummy                | 10.7                  |
+----------------------+-----------------------+--+

scala> val df_hive = spark.sql("select * from test.test_precision")
df_hive: org.apache.spark.sql.DataFrame = [name: string, value: decimal(28,26)]

scala> df_hive.show
+-----+--------------------+
| name|               value|
+-----+--------------------+
|dummy|10.70000000000000...|
+-----+--------------------+

scala> df_hive.printSchema
root
 |-- name: string (nullable = true)
 |-- value: decimal(28,26) (nullable = true)

相关问题