scala 使用Spark启用Kyro Serializer时出错

46scxncf  于 2023-03-23  发布在  Scala
关注(0)|答案(1)|浏览(126)

我使用scalaspark3.3.0中构建了一个管道。
我一直在使用Java串行化器,直到现在没有问题。两天前,我遇到了一些性能问题,并试图使用Kyro串行化器,而不是遵循这个gist没有成功。我得到了以下编译错误。
类型不匹配;发现:Array[Class[_〉:?0(在惰性值Spark中)(其他)0(在惰性值Spark中)(其他)(其他)?0(在惰性值Spark中)(其他)(其他)0(在惰性值Spark中)(其他)(其他)(其他)0(在惰性值Spark中)(其他)(其他)(其他)0(在惰性值Spark中)(其他)(其他)(其他)(其他)0(在惰性值Spark中)(some other)(some other)(some other)(some other)?0(在惰性值spark中)with Array[org.apache.spark.sql.types.DataType]... required:Array[Class[*]]注意:分类[*〉:?0 with?0 with?0 with?0 with?0 with?0 with?0 with Array[org.apache.spark.sql.types.DataType] with org.apache.spark.util.collection.BitSet with?0 with?0 with org.apache.spark.sql.catalyst.expressions.UnsafeRow with Array[org.apache.spark.sql.catalyst.InternalRow] with?0 with org.apache. spark.sql.types.ArrayType with org. apache. spark. sql. types.元数据带?0带?0带?0带?0带Array[org.apache.spark.sql.types.StructField]带org.apache.spark.sql.types.StructField带Array[org.apache.spark.sql.types.StructType]带org.apache.spark.sql.types.StructType带collection.mutable.WrappedArray.ofRef[_$1] forSome { type *$1 }]〈:类[*],但类Array在类型T中是不变的。您可能希望研究通配符类型,如_ <: Class[_]。(SLS 3.2.10)
这看起来像是一个类型不匹配的错误。但我不是scalaMaven,所以我不知道如何修复它。有人能帮我修复这个错误吗?
下面是我的代码。

def getKyroConfig() = {
    val conf = Array(
      classOf[scala.collection.mutable.WrappedArray.ofRef[_]],
      classOf[org.apache.spark.sql.types.StructType],
      classOf[Array[org.apache.spark.sql.types.StructType]],
      classOf[org.apache.spark.sql.types.StructField],
      classOf[Array[org.apache.spark.sql.types.StructField]],
      Class.forName("org.apache.spark.sql.types.StringType$"),
      Class.forName("org.apache.spark.sql.types.LongType$"),
      Class.forName("org.apache.spark.sql.types.BooleanType$"),
      Class.forName("org.apache.spark.sql.types.DoubleType$"),
      Class.forName("[[B"),
      classOf[org.apache.spark.sql.types.Metadata],
      classOf[org.apache.spark.sql.types.ArrayType],
      Class.forName("org.apache.spark.sql.execution.joins.UnsafeHashedRelation"),
      classOf[org.apache.spark.sql.catalyst.InternalRow],
      classOf[Array[org.apache.spark.sql.catalyst.InternalRow]],
      classOf[org.apache.spark.sql.catalyst.expressions.UnsafeRow],
      Class.forName("org.apache.spark.sql.execution.joins.LongHashedRelation"),
      Class.forName("org.apache.spark.sql.execution.joins.LongToUnsafeRowMap"),
      classOf[org.apache.spark.util.collection.BitSet],
      classOf[org.apache.spark.sql.types.DataType],
      classOf[Array[org.apache.spark.sql.types.DataType]],
      Class.forName("org.apache.spark.sql.types.NullType$"),
      Class.forName("org.apache.spark.sql.types.IntegerType$"),
      Class.forName("org.apache.spark.sql.types.TimestampType$"),
      Class.forName("org.apache.spark.sql.execution.datasources.FileFormatWriter$WriteTaskResult"),
      Class.forName("org.apache.spark.internal.io.FileCommitProtocol$TaskCommitMessage"),
      Class.forName("scala.collection.immutable.Set$EmptySet$"),
      Class.forName("scala.reflect.ClassTag$$anon$1"),
      Class.forName("java.lang.Class")
    )

    conf
  }

val spark: SparkSession = {
    val conf_spark: SparkConf =  new SparkConf()

      val kyro_config = getKyroConfig()
      conf_spark.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      conf_spark.set("spark.kryo.registrationRequired", "true")
      conf_spark.set("spark.kryoserializer.buffer", "1024k")
      conf_spark.set("spark.kryoserializer.buffer.max", "1024m")
      conf_spark.registerKryoClasses(kyro_config)

    SparkSession.builder()
      .config(conf_spark)
      .getOrCreate()
  }
zmeyuzjn

zmeyuzjn1#

要使代码编译,显式提供val conf的类型就足够了:

val conf: Array[Class[_]] = Array(...

而不仅仅是

val conf = Array(...

否则类型推断不正确。
Array[Class[_ >: ... with ... with ....]]Array[Class[_]]是所谓的存在类型
What is an existential type?
https://stackoverflow.com/questions/tagged/existential-type%2bscala?tab=Votes
通常Scala会正确推断类型,但有时你必须提供提示。

相关问题