Spark 3.3(scala)中UDF函数的问题

gt0wga4j  于 12个月前  发布在  Scala
关注(0)|答案(1)|浏览(228)

我们正在将一些代码从spark 2.4更改为spark 3.3,并且我们的UDF函数出现了问题。我们生成包含所有逻辑的JAR(在修改POM中的库版本之后),并在Apache Spark Pool中从Azure运行它。
当我们在2.4版本中运行时,代码工作正常,但现在在3.3版本中,我们遇到了错误。
最重要的日志如下:

Caused by: java.lang.IllegalStateException: SparkSession should only be created and accessed on the driver.
at org.apache.spark.sql.SparkSession$.org$apache$spark$sql$SparkSession$$assertOnDriver(SparkSession.scala:1159)
at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:927)
at com.XXXX.AAAA.BBBB.api.Functions$.<init>(Functions.scala:21)
at com.XXXX.AAAA.BBBB.api.Functions$.<clinit>(Functions.scala)

WARN TaskSetManager: Lost task 103.0 in stage 8.0 (TID 2936) (vm-eXXXXXXX executor 43): java.lang.NoClassDefFoundError: Could not initialize class com.XXXX.AAAA.BBBB.api.Functions

代码如下:

object Functions {

    /********************************************************************/
    private val spark = SparkContextCustom.spark
    import spark.implicits._
    
    val getDate = udf((year: Int, month: Int) => {
        val newDate = LocalDate.of(year, month, 1)
          .minusDays(1)
          .minusDays(newDate.getDayOfWeek().getValue() - 1)
        newDate.getDayOfMonth()
    })

    spark.udf.register("getDate", getDate)
        
        
    def changeValueInCol(col1: Column, col2: Column): Column = {

        val year = "2023" // example
        val month = "10"
        val day = "10"
        lazy val calculatedDay = getDate(year, lit(3))
        
        .when(month === lit(3) && day < calculatedDay, lit("I")) 
        .otherwise(col2) 
        
  }
  
    def filterTable(df: DataFrame): DataFrame = {
        /* More logic*/
        df.changeValueInCol($"col1", $"col2") 
        
    }
  
}

在这里我们调用函数:

object XXXX extends App {
  
    /********************************************************************/
    val spark = SparkContextCustom.spark
    import spark.implicits._
    implicit val paralellism: Int = SparkContextCustom.parallelism

    /* After some logic */
    val df_filtered = Functions.filterTable(df)
    
    SparkContextCustom.stop
    
}

我正在尝试几件事,但总是得到“SparkSession应该只在驱动程序上创建和访问”错误。从笔记本中运行代码可以正常工作,但在JAR中不行。

xytpbqjk

xytpbqjk1#

是以下函数中的一个。scala:21:

private val spark = SparkContextCustom.spark
spark.udf.register("getDate", getDate)

如果只能在驱动程序上执行,并且您正在执行程序中使用函数中的其他内容,请将这些内容移到另一个对象中,然后重试。任何对Functions的引用都将触发该代码并尝试使用SparkContext/Session等。这是遗嘱执行人不允许的它在2.4中起作用主要是运气。

相关问题