如何在spark2中使用sparksession覆盖本机spark/hive udf

q9rjltbz  于 2021-06-26  发布在  Hive
关注(0)|答案(0)|浏览(309)

如何在spark2中使用sparksession覆盖本机spark/hive udf。这是改变spark/hive提供的默认行为所必需的,这反过来又允许我们支持遗留代码库。“trunc”的使用示例。
观察:我可以在hive和spark中重写本机函数,但无法使用sparksession在spark2中实现方法重载。
表说明

hive> desc sample_table;
id                      int
name                    string
time_stamp              timestamp

查看记录:

hive> select * from sample_table;
1       Pratap Chandra Dhan     NULL
2       Dyuti Ranjan Nayak      2016-01-01 00:00:00
3       Rajesh  NULL

自定义配置单元自定义项

package com.spark2.udf;

import java.sql.Timestamp;
import org.apache.hadoop.hive.ql.exec.UDF;

public class Trunc extends UDF {

public Integer evaluate(Timestamp input) {
    if (input == null) {
        return null;
    } else {
        return input.getDay();
    }
}

public Long evaluate(Long input) {
    if (input == null) {
        return null;
    } else {
        return input * 1000;
    }
}

public String evaluate(Timestamp input, String str) {
    if (input == null) {
        return null;
    } else {
        return input.getMonth() + "_" + str;
    }
}
}

尝试使用一个参数运行“trunc”函数,它将失败,因为它需要两个参数

hive> select trunc(time_stamp)  from sample_table;
FAILED: SemanticException [Error 10015]: Line 1:7 Arguments length mismatch 'time_stamp': trunc() requires 2 argument, got 1

现在让我们使用上面的类重写,为此我们需要添加jar并注册trunc函数。

hive> list jars;

/*add jars*/
hive> add jar spar2-udf-0.0.1-SNAPSHOT.jar;
Added [spar2-udf-0.0.1-SNAPSHOT.jar] to class path
Added resources: [spar2-udf-0.0.1-SNAPSHOT.jar]

/*register trunc function*/
hive> CREATE TEMPORARY FUNCTION trunc AS "com.spark2.udf.Trunc";
OK
Time taken: 0.25 seconds

/*test trunc function that takes timestamp*/
hive> select trunc(time_stamp)  from sample_table;
OK
NULL
5
NULL
Time taken: 0.287 seconds, Fetched: 3 row(s)

/*test all function behaviour*/
hive> select trunc(id),trunc(time_stamp),trunc(time_stamp,name)  from 
sample_table;
OK
1000    NULL    NULL
2000    5       0_Dyuti Ranjan Nayak
3000    NULL    NULL
Time taken: 0.054 seconds, Fetched: 3 row(s)

方法一:在spark2中使用sparksession[不起作用]:

scala> spark.sql("list jars").show;
 +-------+
 |Results|
 +-------+
 +-------+

 scala> spark.sql("add jar spar2-udf-0.0.1-SNAPSHOT.jar").show;
 +------+
 |result|
 +------+
 |     0|
 +------+

 scala> spark.sql("list jars").collect.foreach(println)
 [spark://10.113.57.185:47278/jars/spar2-udf-0.0.1-SNAPSHOT.jar]

scala> spark.sql("CREATE TEMPORARY FUNCTION trunc AS 'com.spark2.udf.Trunc'").collect.foreach(println)
org.apache.spark.sql.AnalysisException: Function trunc already exists;     
at org.apache.spark.sql.catalyst.catalog.SessionCatalog.registerFunction(SessionCatalog.scala:1083)
at org.apache.spark.sql.execution.command.CreateFunctionCommand.run(functions.scala:63)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:67)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:182)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:67)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:636)
... 48 elided

方法二:在spark2中使用hivecontext[不起作用]:
这种方法在spark 1.6之前一直有效,但由于spark 2.x中不推荐使用hivecontext。这也行不通。

scala> import org.apache.spark.sql.hive.HiveContext;
import org.apache.spark.sql.hive.HiveContext

scala> val hiveContext = new HiveContext(sc)
warning: there was one deprecation warning; re-run with -deprecation for details
hiveContext: org.apache.spark.sql.hive.HiveContext = org.apache.spark.sql.hive.HiveContext@e4b9a64

scala> hiveContext.sql("CREATE TEMPORARY FUNCTION trunc AS 'com.spark2.udf.Trunc'").collect.foreach(println)
org.apache.spark.sql.AnalysisException: Function trunc already exists;
at org.apache.spark.sql.catalyst.catalog.SessionCatalog.registerFunction(SessionCatalog.scala:1083)
at org.apache.spark.sql.execution.command.CreateFunctionCommand.run(functions.scala:63)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:67)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:182)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:67)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:636)
at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:691)
... 48 elided

方法三:在spark2中使用sparksession[有效]:

scala> spark.udf.register("trunc", (input:java.sql.Timestamp,str:String) => 
input.getMonth() + "_" + str)
...

scala> spark.sql("select trunc(time_stamp,name)  from sample_table where 
id=2").collect.foreach(println);
[0_Dyuti Ranjan Nayak]

但方法重载无法工作,以下查询将失败:

scala>spark.sql("select trunc(id),trunc(time_stamp),trunc(time_stamp,name)  
from sample_table").collect.foreach(println)

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题