我已经编写了一个udf函数,希望在hive和spark(scala和pythonapi)中使用。该函数具有以下结构:
public class Fun1 extends UDF {
private static final byte[] var1 = init();
private static byte[] init()
{
String pass=null;
try
{
...
}catch (IOException e){
System.out.println(e.getMessage());
}
return p.getBytes();
}
public String evaluate(String val)
{
...
if(var1 ==null)
{
Fun1.init();
}
try
{
...
return output;
} catch (Exception e){
return null;
}
}
}
然后我将该函数注册为hive中的永久函数:
create function fun as 'com.functions.Fun1' using JAR 'maprfs:///data/udf/udf-1.0.5.jar';
而且这个功能在Hive中工作正常,没有任何问题。
我在spark2.2.1(使用scalaapi)上尝试了这个函数,然后开始看到奇怪的行为。在spark shell(连接到同一个配置单元元存储)中,我尝试了以下查询:
sql("select default.fun(field1) from schema.table group by default.fun(field1)").show
效果不错。但是,我在同一个shell中再次尝试了相同的查询,并得到以下错误:
org.apache.spark.sql.AnalysisException: No handler for Hive UDF 'com.functions.Fun1': java.lang.ClassNotFoundException: com.functions.Fun1; line 1 pos 99
at scala.reflect.internal.util.AbstractFileClassLoader.findClass(AbstractFileClassLoader.scala:62)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.apache.spark.sql.hive.HiveShim$HiveFunctionWrapper.createFunction(HiveShim.scala:221)
然后我开始了新的sparkshell,用这个udf类导入jar(即。 spark-shell --jars maprfs:///data/udf/udf-1.0.5.jar
)
显然,它需要导入这个类,因为这样函数就可以一直正常工作。但奇怪的是,如果没有导入jar,它在我运行查询时第一次工作,但之后就失败了。
你知道为什么会这样吗?
在pyspark中尝试相同的udf时也会遇到不同的问题。我在一些遗嘱执行人身上发现了以下错误。查询最终完成,但是一些执行者抛出了这个错误。
20/06/09 10:52:22 WARN TaskSetManager: Lost task 51.0 in stage 4.0 (TID 225, server1, executor 41): org.apache.hadoop.hive.ql.metadata.HiveException: Unable to execute method public java.lang.String com.functions.Fun1.evaluate(java.lang.String) on object com.functions.Fun1@52e56022 of class com.functions.Fun1 with arguments {test==:java.lang.String} of size 1
at org.apache.hadoop.hive.ql.exec.FunctionRegistry.invoke(FunctionRegistry.java:981)
at org.apache.spark.sql.hive.HiveSimpleUDF.eval(hiveUDFs.scala:91)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:188)
at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.<init>(TungstenAggregationIterator.scala:355)
at org.apache.spark.sql.execution.aggregate.HashAggregateExec$$anonfun$doExecute$1$$anonfun$4.apply(HashAggregateExec.scala:106)
at org.apache.spark.sql.execution.aggregate.HashAggregateExec$$anonfun$doExecute$1$$anonfun$4.apply(HashAggregateExec.scala:97)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.hive.ql.exec.FunctionRegistry.invoke(FunctionRegistry.java:957)
... 21 more
Caused by: java.lang.NullPointerException
at com.functions.Fun1.init(Fun1.java:33)
at com.functions.Fun1.evaluate(Fun1.java:43)
... 26 more
我想把jar导入pyspark shell,但没用。我想知道是什么原因造成上述错误。我应该在spark中创建一个临时函数而不是使用在hive中创建的函数吗?在这种情况下,正确的方法是什么?
谢谢
暂无答案!
目前还没有任何答案,快来回答吧!