为date创建和使用spark-hive自定义项

8zzbczxx  于 2021-06-24  发布在  Hive
关注(0)|答案(2)|浏览(219)

注意:这个问题是从这个问题链接而来的:creting udf function with nonprimitive data type and using in spark sql query:scala
我在scala中创建了一个方法:

package test.udf.demo
    object UDF_Class {
    def transformDate( dateColumn: String, df: DataFrame) : DataFrame = {
    val sparksession = SparkSession.builder().appName("App").getOrCreate()
    val d=df.withColumn("calculatedCol", month(to_date(from_unixtime(unix_timestamp(col(dateColumn),  "dd-MM-yyyy")))))
    df.withColumn("date1",  when(col("calculatedCol") === "01",  concat(concat(year(to_date(from_unixtime(unix_timestamp(col("calculatedCol"), "dd-MM- yyyy"))))-1,  lit('-')),substring(year(to_date(from_unixtime(unix_timestamp(col("calculatedCol")), "dd-MM- yyyy"))),3,4))
    .when(col("calculatedCol") ===  "02",concat(concat(year(to_date(from_unixtime(unix_timestamp(col("calculatedCol"), "dd-MM- yyyy"))))-1,  lit('-')),substring(year(to_date(from_unixtime(unix_timestamp(col("calculatedCol")), "dd-MM- yyyy"))),3,4)))
    .when(col("calculatedCol") ===  "03",concat(concat(year(to_date(from_unixtime(unix_timestamp(col("calculatedCol"), "dd-MM- yyyy"))))-1,  lit('-')),substring(year(to_date(from_unixtime(unix_timestamp(col("calculatedCol")), "dd-MM-yyyy"))),3,4)))
    .otherwise(concat(concat(year(to_date(from_unixtime(unix_timestamp(col("calculatedCol"), "dd-MM-  yyyy")))), lit('-')), substring(year(to_date(from_unixtime(unix_timestamp(col("calculatedCol"), "dd-MM-yyyy")))) + 1, 3, 4))))) 
    val d1=sparksession.udf.register("transform",transformDate _)
    d
    }
    }

我想在sparksql查询中使用这个transformdate方法,它是同一个包中单独的scala代码。

package test.udf.demo
    import test.udf.demo.transformDate
    //sparksession
    sparksession.sql("select id,name,salary,transform(dob) from dbname.tablename")

但我犯了个错误
不是默认数据库中的临时或永久注册函数
有人能指引我吗?

k10s72fa

k10s72fa1#

首先,sparksqludf是一个基于行的函数。不是基于Dataframe的方法。聚合自定义项还采用一系列行。因此,自定义项的定义是错误的。如果我正确理解了您的需求,那么您需要创建case语句的可配置表达式。它可以很容易地通过expr()实现

import spark.implicits._
val exprStr = "case when calculatedCol='01' then <here goes your code statements> as FP"
val modifiedDf = sql("""select id,name,salary,$exprStr  from dbname.tablename""")

会有用的

j8yoct9x

j8yoct9x2#

afaik spark用户定义的自定义项不能接受或返回 DataFrame . 这是阻止你的自定义项注册

相关问题