如何在spark2中使用sparksession覆盖本机spark/hive udf。这是改变spark/hive提供的默认行为所必需的,这反过来又允许我们支持遗留代码库。“trunc”的使用示例。
观察:我可以在hive和spark中重写本机函数,但无法使用sparksession在spark2中实现方法重载。
表说明
hive> desc sample_table;
id int
name string
time_stamp timestamp
查看记录:
hive> select * from sample_table;
1 Pratap Chandra Dhan NULL
2 Dyuti Ranjan Nayak 2016-01-01 00:00:00
3 Rajesh NULL
自定义配置单元自定义项
package com.spark2.udf;
import java.sql.Timestamp;
import org.apache.hadoop.hive.ql.exec.UDF;
public class Trunc extends UDF {
public Integer evaluate(Timestamp input) {
if (input == null) {
return null;
} else {
return input.getDay();
}
}
public Long evaluate(Long input) {
if (input == null) {
return null;
} else {
return input * 1000;
}
}
public String evaluate(Timestamp input, String str) {
if (input == null) {
return null;
} else {
return input.getMonth() + "_" + str;
}
}
}
尝试使用一个参数运行“trunc”函数,它将失败,因为它需要两个参数
hive> select trunc(time_stamp) from sample_table;
FAILED: SemanticException [Error 10015]: Line 1:7 Arguments length mismatch 'time_stamp': trunc() requires 2 argument, got 1
现在让我们使用上面的类重写,为此我们需要添加jar并注册trunc函数。
hive> list jars;
/*add jars*/
hive> add jar spar2-udf-0.0.1-SNAPSHOT.jar;
Added [spar2-udf-0.0.1-SNAPSHOT.jar] to class path
Added resources: [spar2-udf-0.0.1-SNAPSHOT.jar]
/*register trunc function*/
hive> CREATE TEMPORARY FUNCTION trunc AS "com.spark2.udf.Trunc";
OK
Time taken: 0.25 seconds
/*test trunc function that takes timestamp*/
hive> select trunc(time_stamp) from sample_table;
OK
NULL
5
NULL
Time taken: 0.287 seconds, Fetched: 3 row(s)
/*test all function behaviour*/
hive> select trunc(id),trunc(time_stamp),trunc(time_stamp,name) from
sample_table;
OK
1000 NULL NULL
2000 5 0_Dyuti Ranjan Nayak
3000 NULL NULL
Time taken: 0.054 seconds, Fetched: 3 row(s)
方法一:在spark2中使用sparksession[不起作用]:
scala> spark.sql("list jars").show;
+-------+
|Results|
+-------+
+-------+
scala> spark.sql("add jar spar2-udf-0.0.1-SNAPSHOT.jar").show;
+------+
|result|
+------+
| 0|
+------+
scala> spark.sql("list jars").collect.foreach(println)
[spark://10.113.57.185:47278/jars/spar2-udf-0.0.1-SNAPSHOT.jar]
scala> spark.sql("CREATE TEMPORARY FUNCTION trunc AS 'com.spark2.udf.Trunc'").collect.foreach(println)
org.apache.spark.sql.AnalysisException: Function trunc already exists;
at org.apache.spark.sql.catalyst.catalog.SessionCatalog.registerFunction(SessionCatalog.scala:1083)
at org.apache.spark.sql.execution.command.CreateFunctionCommand.run(functions.scala:63)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:67)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:182)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:67)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:636)
... 48 elided
方法二:在spark2中使用hivecontext[不起作用]:
这种方法在spark 1.6之前一直有效,但由于spark 2.x中不推荐使用hivecontext。这也行不通。
scala> import org.apache.spark.sql.hive.HiveContext;
import org.apache.spark.sql.hive.HiveContext
scala> val hiveContext = new HiveContext(sc)
warning: there was one deprecation warning; re-run with -deprecation for details
hiveContext: org.apache.spark.sql.hive.HiveContext = org.apache.spark.sql.hive.HiveContext@e4b9a64
scala> hiveContext.sql("CREATE TEMPORARY FUNCTION trunc AS 'com.spark2.udf.Trunc'").collect.foreach(println)
org.apache.spark.sql.AnalysisException: Function trunc already exists;
at org.apache.spark.sql.catalyst.catalog.SessionCatalog.registerFunction(SessionCatalog.scala:1083)
at org.apache.spark.sql.execution.command.CreateFunctionCommand.run(functions.scala:63)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:67)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:182)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:67)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:636)
at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:691)
... 48 elided
方法三:在spark2中使用sparksession[有效]:
scala> spark.udf.register("trunc", (input:java.sql.Timestamp,str:String) =>
input.getMonth() + "_" + str)
...
scala> spark.sql("select trunc(time_stamp,name) from sample_table where
id=2").collect.foreach(println);
[0_Dyuti Ranjan Nayak]
但方法重载无法工作,以下查询将失败:
scala>spark.sql("select trunc(id),trunc(time_stamp),trunc(time_stamp,name)
from sample_table").collect.foreach(println)
暂无答案!
目前还没有任何答案,快来回答吧!