如何在spark数据集中存储嵌套的自定义对象?

yeotifhr  于 2021-05-24  发布在  Spark
关注(0)|答案(2)|浏览(605)

问题是如何在数据集中存储自定义对象?
spark版本:3.0.1
可以实现非嵌套自定义类型:

import spark.implicits._
import org.apache.spark.sql.{Encoder, Encoders}

class AnObj(val a: Int, val b: String)

implicit val myEncoder: Encoder[AnObj] = Encoders.kryo[AnObj] 

val d = spark.createDataset(Seq(new AnObj(1, "a")))

d.printSchema
root
 |-- value: binary (nullable = true)

但是,如果自定义类型嵌套在 product 类型(即。 case class ),它给出了一个错误:
java.lang.unsupportedoperationexception:找不到innerobj的编码器

import spark.implicits._
import org.apache.spark.sql.{Encoder, Encoders}

class InnerObj(val a: Int, val b: String)
case class MyObj(val i: Int, val j: InnerObj)

implicit val myEncoder: Encoder[InnerObj] = Encoders.kryo[InnerObj] 

// error
val d = spark.createDataset(Seq(new MyObj(1, new InnerObj(0, "a"))))
// it gives Runtime error: java.lang.UnsupportedOperationException: No Encoder found for InnerObj

我们如何创造 Dataset 嵌套自定义类型?

hrysbysz

hrysbysz1#

除了sujesh之外的另一个解决方案:

import spark.implicits._
import org.apache.spark.sql.{Encoder, Encoders}

class InnerObj(val a: Int, val b: String)
case class MyObj[T](val i: Int, val j: T)

implicit val myEncoder: Encoder[MyObj[InnerObj]] = Encoders.kryo[MyObj[InnerObj]] 

// works
val d = spark.createDataset(Seq(new MyObj(1, new InnerObj(0, "a"))))

这也显示了内部类型可以从 type parameter ,以及无法推断的情况。
前一种情况应该这样做:

implicit val myEncoder: Encoder[MyObj[InnerObj]] = Encoders.kryo[MyObj[InnerObj]]

后一种情况应该这样做:

implicit val myEncoder1: Encoder[InnerObj] = Encoders.kryo[InnerObj]
implicit val myEncoder2: Encoder[MyObj] = Encoders.kryo[MyObj]
yyyllmsg

yyyllmsg2#

为myobj和innerobj添加编码器应该可以让它工作。

class InnerObj(val a:Int, val b: String)
  case class MyObj(val i: Int, j: InnerObj)

  implicit val myEncoder: Encoder[InnerObj] = Encoders.kryo[InnerObj]
  implicit val objEncoder: Encoder[MyObj] = Encoders.kryo[MyObj]

上面的代码段可以很好地编译和运行

相关问题