我有一个输入数据集[input],我想在上面执行一个转换函数。要求这个转换函数可以有多个实现,并且应该总是返回一个dataset[layout],其中类layout的键和值参数对于每个实现都会有所不同。
我尝试创建一个trait接受类型参数k和v,并使用transform函数,然后像下面的代码中那样实现这个trait的类。
问题是,在dataset.transform方法中,编码器无法解析类型参数k和v的乘积。如何正确执行这一要求?
import scala.language.existentials
import org.apache.spark.sql.{Dataset, Encoders}
// Given
case class Input(key: String, id: String, name: String, value: String, metadata: String)
trait ExportLayout[K <: Product, V <: Product] {
case class Layout(Key: K, Value: V)
def transformFn(inputDS: Dataset[Input]): Dataset[Layout]
}
object DefaultLayout {
case class Key(id: String, key: String, name: String)
case class Value(value: String, metadata: String)
}
case class DefaultLayout() extends ExportLayout[DefaultLayout.Key, DefaultLayout.Value] {
import DefaultLayout._
override def transformFn(inputDS: Dataset[Input]): Dataset[Layout] = {
inputDS.map { row =>
Layout(Key(row.id, row.key, row.name), Value(row.value, row.metadata))
}(Encoders.product[Layout])
}
}
object AnotherLayout {
case class Key(???)
case class Value(???)
}
case class AnotherLayout() extends ExportLayout[AnotherLayout.Key, AnotherLayout.Value] {
import AnotherLayout._
override def transformFn(inputDS: Dataset[Input]): Dataset[Layout] = {
inputDS.map { row => Layout(Key(???), Value(???)) }(Encoders.product[Layout])
}
}
// Test
val rows = Seq(
Input("111", "cn", "A", "10", "a"),
Input("222", "cn", "B", "20", "b")
)
val ds = spark.createDataset(rows)(Encoders.product[Input])
val layoutDS = ds.transform(DefaultLayout().transformFn)
/* throws error:
type K is not a class
scala.ScalaReflectionException: type K is not a class
at scala.reflect.api.Symbols$SymbolApi$class.asClass(Symbols.scala:275)
at scala.reflect.internal.Symbols$SymbolContextApiImpl.asClass(Symbols.scala:84)
at org.apache.spark.sql.catalyst.ScalaReflection$.getClassFromType(ScalaReflection.scala:707)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$dataTypeFor$1.apply(ScalaReflection.scala:91)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$dataTypeFor$1.apply(ScalaReflection.scala:72)
at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56)
at org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:906)
at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:46)
at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$dataTypeFor(ScalaReflection.scala:71)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1$$anonfun$8.apply(ScalaReflection.scala:639)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1$$anonfun$8.apply(ScalaReflection.scala:632)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.immutable.List.flatMap(List.scala:355)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:632)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:452)
at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56)
at org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:906)
at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:46)
at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:452)
at org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor(ScalaReflection.scala:441)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:71)
at org.apache.spark.sql.Encoders$.product(Encoders.scala:275)
at org.apache.spark.sql.LowPrioritySQLImplicits$class.newProductEncoder(SQLImplicits.scala:248)
at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:34)
at DefaultLayout.transformFn(ExportLayout.scala:_)
* /
暂无答案!
目前还没有任何答案,快来回答吧!