如何在scala/spark中编码代数数据类型?

szqfcxe2  于 2021-05-24  发布在  Spark
关注(0)|答案(0)|浏览(272)

我有一个代数数据类型,我想用它作为case类的参数,如下所示:

sealed abstract class DayOfWeek(val id: String)

object DayOfWeek {
  final object Sunday    extends DayOfWeek("sunday")
  final object Monday    extends DayOfWeek("monday")
  final object Tuesday   extends DayOfWeek("tuesday")
  final object Wednesday extends DayOfWeek("wednesday")
  final object Thursday  extends DayOfWeek("thursday")
  final object Friday    extends DayOfWeek("friday")
  final object Saturday  extends DayOfWeek("saturday")

  val members: List[DayOfWeek] = List(Sunday, Monday, Tuesday, Wednesday, Thursday, Friday, Saturday)

  def apply(id: String): DayOfWeek = members
    .map(member => (member.id, member))
    .toMap
    .apply(id)
}

我在这里看到的答案是,没有好的方法可以做到这一点,就像这个。但我不认为是这样。
似乎有一条使用 UserDefinedType 以及 UDTRegistration.register . 从spark 2.x开始,它们被标记为private,但是我尝试在 org.apache.spark 命名空间。处理私人问题的。但是当我打电话给你的时候 .toDSSeq[DayOfWeek] ,即使我打电话给登记处,它仍然说 value toDS is not a member of Seq[DayOfWeek] . 所以它不接受注册。

package org.apache.spark

object DayOfWeekUDT {
  def register(): Unit = UDTRegistration.register(classOf[DayOfWeek].getName, classOf[DayOfWeekUDT].getName)
}

class DayOfWeekUDT extends UserDefinedType[DayOfWeek] {
  override def sqlType: DataType = StringType
  override def serialize(obj: DayOfWeek): Any = org.apache.spark.unsafe.types.UTF8String.fromString(obj.id)
  override def deserialize(datum: Any): DayOfWeek = DayOfWeek(datum.toString)
  override def userClass: Class[DayOfWeek] = classOf[DayOfWeek]
}

还有创建类型为的隐式val Encoder[DayOfWeek 使用 ExpressionEncoder . 我搜索了所有github的例子。我能找到的仅有的几个不适合我的特殊需要。我无法很好地理解它们,无法制作自己的版本并使其发挥作用。这应该适用于spark 2.x(在我的例子中是2.4.x)。这只是一个如何使用这个工具的问题。这是我试过的,也有过的 DayOfWeek 伴奏对象:

private val clazz: Class[DayOfWeek] = classOf[DayOfWeek]

  private val inputObject: BoundReference = BoundReference(0, ObjectType(clazz), false)

  private val converter = StaticInvoke(
    classOf[UTF8String],
    StringType,
    "fromString",
    Invoke(inputObject, "id", ObjectType(classOf[String])) :: Nil
  )

  private val serializer: Seq[Expression] = CreateNamedStruct(Literal("day_of_week") :: converter :: Nil).flatten

  private val deserializer: Expression = StaticInvoke(
    staticObject = DayOfWeek.getClass,
    dataType = ObjectType(clazz),
    functionName = "apply",
    arguments = Invoke(
      targetObject = UpCast(
        child = GetColumnByOrdinal(0, StringType),
        dataType = StringType,
        walkedTypePath = "- root class: DayOfWeek" :: Nil
      ),
      functionName = "id",
      dataType = ObjectType(classOf[String])
    ) :: Nil,
    propagateNull = false,
    returnNullable = false
  )

  implicit val encoder: Encoder[DayOfWeek] = new ExpressionEncoder[DayOfWeek](
    schema = StructType(Seq(StructField("id", StringType, false))),
    flat = true,
    serializer = serializer,
    deserializer = deserializer,
    clsTag = ClassTag(classOf[DayOfWeek])
  )

有人知道怎么做吗?我喜欢这个 UserDefinedType 概念上,在spark和adt中用数据类型来回转换是非常清楚的。这个 ExpressionEncoder 看起来像是尽可能地写得晦涩难懂。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题