scala 如何让Spark在将DataFrame转换为具有较少字段的case类的Dataset时引发异常?

dw1jzc5e  于 2023-05-17  发布在  Scala
关注(0)|答案(2)|浏览(169)

假设我们有一个行类型为(col1: Int, col2: String)的DataFrame df。如果我有一个case类MyClass(col1: Int)(缺少col2: String),那么df.as[MyClass]不会引发异常。
我怎样才能让它引发一个异常,这样我就知道我的case类与DataFrame相比缺少了一些字段?

编辑

添加更多信息。我的主要用例是,如果上游数据源更改了它们的模式,则会失败。如果df的模式的列数较少,则可以正常工作; MyClass2(col1: Int, col2: String, col3: Double => df.as[MyClass2]失败,但有以下例外。

org.apache.spark.sql.AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `col3` cannot be resolved. Did you mean one of the following? [`col1`, `col2`].
ncgqoxb0

ncgqoxb01#

.as[]方法在源代码中有一些文档(我链接到v3.4.0,这是本文发布之日的最新版本)。重要的一点是:
如果数据集的架构与所需的U类型不匹配,则可以根据需要使用select沿着aliasas来重新排列或重命名。
请注意,as[]只更改传递到类型化操作(如map())中的数据的视图,而不会立即投影掉指定类中不存在的任何列。
这意味着你看到的行为是完全预期的。由于.as[]不会急切地投影掉任何不存在于MyClass类中的列,因此您必须使用.select自己完成。
回答你的问题:通过只使用.as[]方法,你不能让这些错误发生。
请注意,您仍然可以对那些在模式中指定的列使用类型化操作。考虑以下示例:

scala> val df = Seq((1, 2), (3, 4)).toDF("col1", "col2")
scala> case class MyClass(col1: Int)

scala> df.show
+----+----+
|col1|col2|
+----+----+
|   1|   2|
|   3|   4|
+----+----+

scala> df.as[MyClass].map(_.col1 + 1).show
+-----+
|value|
+-----+
|    2|
|    4|
+-----+

如果真的想触发错误,可以通过手动比较模式来实现。一个非常简单的例子:

import org.apache.spark.sql.Encoders

val classSchema = Encoders.product[MyClass].schema
val dfSchema = df.schema

if (classSchema != dfSchema)
  throw new RuntimeException("Your schemas are not the same")
lvjbypge

lvjbypge2#

一种方法是使用SparkSessionExtensions.injectResolutionRule注册(inject)一个解析规则,这将扩展(improve)Analyzer的工作方式。
据说在分析之后,查询将产生一些结果。你必须阻止它,这就是解决规则的用武之地。
再进一步考虑,这样的基础设施不应该由用户代码(由开发人员编写)来处理,而应该由Spark SQL“平台”本身来处理。

相关问题