creting非原始数据类型的udf函数及其在sparksql查询中的应用:scala

gudnpqoy  于 2021-06-24  发布在  Hive
关注(0)|答案(2)|浏览(317)

我正在scala中创建一个我想在spark sql查询中使用的函数。我的查询在hive中运行良好,或者如果我在spark sql中提供相同的查询,但在多个位置使用相同的查询,那么我希望将其创建为可重用的函数/方法,以便在需要时可以调用它。我在scala类中创建了以下函数。

def date_part(date_column:Column) = {
    val m1: Column = month(to_date(from_unixtime(unix_timestamp(date_column, "dd-MM-yyyy")))) //give  value as 01,02...etc

    m1 match {
        case 01 => concat(concat(year(to_date(from_unixtime(unix_timestamp(date_column, "dd-MM- yyyy"))))-1,'-'),substr(year(to_date(from_unixtime(unix_timestamp(date_column, "dd-MM-yyyy")))),3,4))
        //etc..
        case _ => "some other logic"
    }
}

但它显示出多重错误。
对于01:
◾十进制整数文字不能有前导零(八进制语法已过时。)
◾类型不匹配;找到:int(0)必需:org.apache.spark.sql.column。
对于“-”:
类型不匹配;找到:需要char('-'):org.apache.spark.sql.column。
对于“substr”:
未找到:值substr。
另外,如果我创建的任何简单函数的类型也是column,我就不能注册它,因为我得到的错误在columnar格式中是不可能的,int)它的工作很好。但在我的情况下,类型是列,所以我不能这样做。有人请指导我该怎么做。截至目前,我发现堆栈溢出,我需要使用此函数与df,然后需要将此df转换为临时表。有人请指导我任何其他替代方式,所以没有太多的变化,我现有的代码我可以使用此功能。

ohfgkhjo

ohfgkhjo1#

试试下面的代码。

scala> import org.joda.time.format._
import org.joda.time.format._

scala> spark.udf.register("datePart",(date:String) => DateTimeFormat.forPattern("MM-dd-yyyy").parseDateTime(date).toString(DateTimeFormat.forPattern("MMyyyy")))
res102: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))

scala> spark.sql("""select datePart("03-01-2019") as datepart""").show
+--------+
|datepart|
+--------+
|  032019|
+--------+
eeq64g8w

eeq64g8w2#

首先,spark需要读取一个存储数据的文件,我猜这个文件是csv,但是可以使用json方法代替csv。
然后,您可以添加具有计算值的新列,如下所示:

import org.apache.spark.sql.functions._

      val df = spark.read
        .option("header", "true")
        .option("inferSchema", "true")
        .csv("/path/mydata.csv")

      def transformDate( dateColumn: String, df: DataFrame) : DataFrame = {
         df.withColumn("calculatedCol", month(to_date(from_unixtime(unix_timestamp(col(dateColumn), "dd-MM-yyyy")))))

         df.withColumn("newColumnWithDate",  when(col("calculatedCol") === "01", concat(concat(year(to_date(from_unixtime(unix_timestamp(col("calculatedCol"), "dd-MM- yyyy"))))-1, lit('-')),substring(year(to_date(from_unixtime(unix_timestamp(col("calculatedCol")), "dd-MM-yyyy"))),4,2))
          .when(col("calculatedCol") === "02","some other logic")
          .otherwise("nothing match")))
      }

     // calling your function for the Dataframe you want transform date column:
     transformDate("date_column", df)

注意有些函数需要一个列作为参数,而不是字符串值,所以请使用lit()指定该值。
不需要自定义项(在性能方面也不推荐),但您可以按以下方式使用它:

val upper: String => String = _.toUpperCase
import org.apache.spark.sql.functions.udf
val upperUDF = udf(upper)
df.withColumn("upper", upperUDF('text)).show

其中'upper'函数将是方法,您必须包含转换日期列的逻辑。

相关问题