如何从trait中定义的泛型类型参数对数据集类型进行编码-scala.scalareflectionexception:类型k不是类

6kkfgxo0  于 2021-07-14  发布在  Spark
关注(0)|答案(0)|浏览(225)

我有一个输入数据集[input],我想在上面执行一个转换函数。要求这个转换函数可以有多个实现,并且应该总是返回一个dataset[layout],其中类layout的键和值参数对于每个实现都会有所不同。
我尝试创建一个trait接受类型参数k和v,并使用transform函数,然后像下面的代码中那样实现这个trait的类。
问题是,在dataset.transform方法中,编码器无法解析类型参数k和v的乘积。如何正确执行这一要求?

import scala.language.existentials
import org.apache.spark.sql.{Dataset, Encoders}

// Given
case class Input(key: String, id: String, name: String, value: String, metadata: String)

trait ExportLayout[K <: Product, V <: Product] {
  case class Layout(Key: K, Value: V)
  def transformFn(inputDS: Dataset[Input]): Dataset[Layout]
}

object DefaultLayout {
  case class Key(id: String, key: String, name: String)
  case class Value(value: String, metadata: String)
}
case class DefaultLayout() extends ExportLayout[DefaultLayout.Key, DefaultLayout.Value] {
  import DefaultLayout._
  override def transformFn(inputDS: Dataset[Input]): Dataset[Layout] = {
    inputDS.map { row =>
      Layout(Key(row.id, row.key, row.name), Value(row.value, row.metadata))
    }(Encoders.product[Layout])
  }
}

object AnotherLayout {
  case class Key(???)
  case class Value(???)
}
case class AnotherLayout() extends ExportLayout[AnotherLayout.Key, AnotherLayout.Value] {
  import AnotherLayout._
  override def transformFn(inputDS: Dataset[Input]): Dataset[Layout] = {
    inputDS.map { row => Layout(Key(???), Value(???)) }(Encoders.product[Layout])
  }
}

// Test
val rows = Seq(
  Input("111", "cn", "A", "10", "a"),
  Input("222", "cn", "B", "20", "b")
)
val ds = spark.createDataset(rows)(Encoders.product[Input])
val layoutDS = ds.transform(DefaultLayout().transformFn)

/* throws error:
type K is not a class
scala.ScalaReflectionException: type K is not a class
    at scala.reflect.api.Symbols$SymbolApi$class.asClass(Symbols.scala:275)
    at scala.reflect.internal.Symbols$SymbolContextApiImpl.asClass(Symbols.scala:84)
    at org.apache.spark.sql.catalyst.ScalaReflection$.getClassFromType(ScalaReflection.scala:707)
    at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$dataTypeFor$1.apply(ScalaReflection.scala:91)
    at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$dataTypeFor$1.apply(ScalaReflection.scala:72)
    at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56)
    at org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:906)
    at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:46)
    at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$dataTypeFor(ScalaReflection.scala:71)
    at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1$$anonfun$8.apply(ScalaReflection.scala:639)
    at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1$$anonfun$8.apply(ScalaReflection.scala:632)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
    at scala.collection.immutable.List.flatMap(List.scala:355)
    at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:632)
    at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:452)
    at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56)
    at org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:906)
    at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:46)
    at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:452)
    at org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor(ScalaReflection.scala:441)
    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:71)
    at org.apache.spark.sql.Encoders$.product(Encoders.scala:275)
    at org.apache.spark.sql.LowPrioritySQLImplicits$class.newProductEncoder(SQLImplicits.scala:248)
    at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:34)
    at DefaultLayout.transformFn(ExportLayout.scala:_)

* /

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题