Snowpark Scala中未知的用户定义函数

ozxc1zmp  于 2023-10-18  发布在  Scala
关注(0)|答案(1)|浏览(126)

我是新来的Snowpark。我有一个spark-scala项目。我要迁移到斯卡拉的雪场。我的项目中有这个udf

def maskColumnsUDF(maskChar : String = "*", noOfLeftVisibleChars : Int = 1, noOfRightVisiblechars : Int = 1): UserDefinedFunction = udf((attrVal :String) => {

      var finalString =  ""
      if(attrVal == null){ attrVal }
      else if (attrVal.length > 4 ) {
        val attrArray = attrVal.toCharArray
        for (counter <- 0 to attrArray.length() - 1) {
          if (counter < noOfLeftVisibleChars || counter > attrArray.length() - (noOfRightVisiblechars + 1){
            finalString = finalString + attrArray(counter)
          }
          else {
            finalString = finalString + maskChar
          }
        }
        finalString
      }
      else { "***" }
})

当我调用udf时:

columnNamesList.foreach(columnName => {
    sampleDf = sampleDf.withColumn(columnName+"_zs_masked",
    UserDefinedFunctions.maskColumnsUDF("*",1,1)(col(columnName)))
})

我得到这个错误
[main]错误com.snowflake.snowpark.internal.ServerConnection -无法分析查询的架构:线程“main”出现异常net.snowflake.client.jdbc.SnowflakeSQLException:SQL编译错误:未知的用户定义函数SNOWPARK_TEST.PUBLIC.SNOWPARK_TEMP_FUNCTION_GICGX7MLYN3OHZO
我甚至尝试了像这样简单的udf

val schema = StructType(Seq(
    StructField("column_name", StringType, nullable = false),
    StructField("featureName", StringType, nullable = false),
    StructField("distribution", DoubleType, nullable = false)
  ))

val data = Seq(
    Row("A", "foo", 0.2),
    Row("A", "bar", 0.5),
    Row("B", "baz", 0.1),
    Row("B", "qux", 0.3)
)

var featureDF: DataFrame = session.createDataFrame(data, schema)
featureDF = featureDF.withColumn("test_udf", UserDefinedFunctions.test_udf(col("column_name")))

得到这个
[main] ERROR com.snowflake.snowpark.internal.ServerConnection -无法执行查询:线程“main”出现异常net.snowflake.client.jdbc.SnowflakeSQLException:SQL编译错误:未知的用户定义函数SNOWPARK_TEST.PUBLIC.SNOWPARK_TEMP_FUNCTION_R0KRMUFIRRFYNUX
我没有添加它在这两种情况下尝试执行的sql查询,因为查询太长

lokaqttq

lokaqttq1#

我尝试了一些类似于你的代码。我甚至创建了一个像这样的测试scala proc:

create procedure test_proc()
    returns string
    language scala
    runtime_version = 2.12
    packages =('com.snowflake:snowpark:latest')
    handler = 'Procedure.main'
    as '

import com.snowflake.snowpark._
import com.snowflake.snowpark.types._
import com.snowflake.snowpark.functions._

object Procedure {
  def main(session: Session): String = {
    import java.io.{ByteArrayOutputStream, PrintStream}
    val outputStream = new ByteArrayOutputStream()
    Console.withOut(new PrintStream(outputStream)) {
        def maskColumnsUDF(maskChar : String = "*", noOfLeftVisibleChars : Int = 1, noOfRightVisiblechars : Int = 1): UserDefinedFunction = udf((attrVal :String) => {        
              var finalString =  ""
              if(attrVal == null){ attrVal }
              else if (attrVal.length > 4 ) {
                val attrArray = attrVal.toCharArray
                for (counter <- 0 to attrArray.length() - 1) {
                  if (counter < noOfLeftVisibleChars || counter > attrArray.length() - (noOfRightVisiblechars + 1))
                  {
                    finalString = finalString + attrArray(counter)
                  }
                  else {
                    finalString = finalString + maskChar
                  }
                }
                finalString
              }
              else { "***" }
        })
        
        val schema = StructType(Seq(
            StructField("column_name", StringType, nullable = false),
            StructField("featureName", StringType, nullable = false),
            StructField("distribution", DoubleType, nullable = false)
          ))
    
        val data = Seq(
            Row("A", "foo", 0.2),
            Row("A", "bar", 0.5),
            Row("B", "baz", 0.1),
            Row("B", "qux", 0.3)
        )
        
        var featureDF: DataFrame = session.createDataFrame(data, schema)
        featureDF.withColumn("xx",maskColumnsUDF("*",1,1)(col("column_name"))).show()
    }
    outputStream.toString()
  }
}';

而且效果很好。因为这是临时UDF,我认为你可能会面临的是,你正在使用不同的会话,这就是为什么你得到的错误,该函数不存在。
您可以尝试以下操作:

def maskColumnsUDF(session:Session,maskChar : String = "*", noOfLeftVisibleChars : Int = 1, noOfRightVisiblechars : Int = 1): 
    UserDefinedFunction = {
        val udf_name = s"maskColumnsUDF_${noOfLeftVisibleChars}_${noOfRightVisiblechars}"
        session.sql(s"drop function if exists ${udf_name}(varchar)")
        session.udf.registerPermanent(
         udf_name,
         (attrVal :String) => {        
              var finalString =  ""
              if(attrVal == null){ attrVal }
              else if (attrVal.length > 4 ) {
                val attrArray = attrVal.toCharArray
                for (counter <- 0 to attrArray.length() - 1) {
                  if (counter < noOfLeftVisibleChars || counter > attrArray.length() - (noOfRightVisiblechars + 1))
                  {
                    finalString = finalString + attrArray(counter)
                  }
                  else {
                    finalString = finalString + maskChar
                  }
                }
                finalString
              }
              else { "***" }
        },"@mystage")
    }

这将使用应在所有会话中工作的永久性UDF。我只是建议诊断错误。最好确定会话是否不同。

相关问题