scala Spark Case类-DECIMAL类型编码器错误“无法从DECIMAL向上转换”

6xfqseft  于 2022-11-09  发布在  Scala
关注(0)|答案(6)|浏览(638)

我正在从MySQL/MariaDB提取数据,在创建数据集的过程中,数据类型出错
异常:无法将AMOUNT从DECIMAL(30,6)向上强制转换为DECIMAL(38,18),因为它可能截断目标对象的类型路径是:-field(class:“org.apache.park k.sql.tyes.Decimal”,name:“Amount”)-根类:“com.misp.spk.Deal”您可以向输入数据添加显式强制转换,也可以在目标对象中选择更高精度的字段类型;
Case类的定义如下

case class
(
AMOUNT: Decimal
)

有人知道如何修复它而不碰数据库吗?

8ljdwjyq

8ljdwjyq1#

该错误指出,ApacheSpark不能自动将数据库中的BigDecimal(30,6)转换为数据集中需要的BigDecimal(38,18)(我不知道它为什么需要固定的参数38,18。更奇怪的是,Spark不能自动将低精度的类型转换为高精度的类型)。
报告了一个错误:https://issues.apache.org/jira/browse/SPARK-20162(可能是您)。无论如何,我发现通过将列转换为DataFrame中的BigDecimal(38,18),然后将DataFrame转换为DataSet,可以很好地解决读取数据的问题。

//first read data to dataframe with any way suitable for you
var df: DataFrame = ???
val dfSchema = df.schema

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.DecimalType
dfSchema.foreach { field =>
  field.dataType match {
    case t: DecimalType if t != DecimalType(38, 18) =>
      df = df.withColumn(field.name, col(field.name).cast(DecimalType(38,18)))
  }
}
df.as[YourCaseClassWithBigDecimal]

它应该能解决阅读方面的问题(但我想不能解决写作方面的问题)

qcbq4gxm

qcbq4gxm2#

如前所述,由于您的数据库使用DecimalType(30,6),这意味着您总共有30个槽和小数点之后的6个槽,这使得30-6=24用于小数点前面的区域。我喜欢称它为(24 left, 6 right)大小数。这当然不适合(20 left, 18 right)(即DecimalType(38,18)),因为后者在左侧没有足够的插槽(需要20个对24个)。我们在DecimalType(38,18)中只有20个左侧插槽,但我们需要24个左侧插槽来容纳您的DecimalType(30,6)
在这里,我们可以做的是将(24 left, 6 right)向下转换为(20 left, 6 right)(即DecimalType(26,6)),这样当它被自动转换为(20 left, 18 right)(即DecimalType(38,18))时,两边都会适合。您的DecimalType(26,6)将有20个左侧插槽,使其可以安装在DecimalType(38,18)中,当然,6个右侧插槽将适合18个插槽。
执行此操作的方法是在将任何内容转换为DataSet之前,对DataFrame运行以下操作:

val downCastableData = 
  originalData.withColumn("amount", $"amount".cast(DecimalType(26,6)))

那么转换到Dataset应该是可行的。
(实际上,您可以强制转换为任何(20 left, 6 right)或更小的值,例如(19 left, 5 right)等)。

muk1a3rh

muk1a3rh3#

虽然我没有解决方案,但我对正在发生的事情的理解是:
默认情况下,Spark会将case class中的Decimal类型(或BigDecimal)的架构推断为DecimalType(38, 18)(请参见org.apache.spark.sql.types.DecimalType.SYSTEM_DEFAULT)。38表示Decimal总共可以容纳38位(小数点左右),而18表示18其中38位保留给小数点右侧。这意味着Decimal(38, 18)的小数点左侧可能有20位数字。您的MySQL模式是decimal(30, 6),这意味着它可能包含小数点左侧24位(30-6)和小数点右侧6位的值。由于24位数大于20位数,因此在从MySQL模式转换为Decimal类型时,可能会有截断的值。
不幸的是,从Scala case class推断模式被Spark开发人员视为一种便利,他们选择不支持程序员为case class中的DecimalBigDecimal类型指定精度和小数位数(请参阅https://issues.apache.org/jira/browse/SPARK-18484)

nfeuvbwi

nfeuvbwi4#

基于@user2737635的回答,您可以使用foldLeft而不是foreach,以避免将数据集定义为var并重新定义它:

//first read data to dataframe with any way suitable for you
val df: DataFrame = ???
val dfSchema = df.schema

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.DecimalType
dfSchema.foldLeft(df){ 
  (dataframe, field) =>  field.dataType match {
    case t: DecimalType if t != DecimalType(38, 18) => dataframe.withColumn(field.name, col(field.name).cast(DecimalType(38, 18)))
    case _ => dataframe
  }
}.as[YourCaseClassWithBigDecimal]
cgvd09ve

cgvd09ve5#

我们正在通过定义我们自己的Encoder来解决这个问题,我们在调用位置.as使用它。我们使用知道正确精度和小数位数的StructType生成Encoder(参见下面的代码链接)。
https://issues.apache.org/jira/browse/SPARK-27339

r1zk6ea1

r1zk6ea16#

根据pyspark的说法,Decimal(38,18)是默认设置。
创建DecimalType时,默认精度和小数位数为(10,0)。当从decimal.Decimal对象推断架构时,它将是DecimalType(38,18)。

相关问题