spark中自定义配置单元udf的不同行为

azpvetkf  于 2021-05-29  发布在  Spark
关注(0)|答案(0)|浏览(362)

我已经编写了一个udf函数,希望在hive和spark(scala和pythonapi)中使用。该函数具有以下结构:

  1. public class Fun1 extends UDF {
  2. private static final byte[] var1 = init();
  3. private static byte[] init()
  4. {
  5. String pass=null;
  6. try
  7. {
  8. ...
  9. }catch (IOException e){
  10. System.out.println(e.getMessage());
  11. }
  12. return p.getBytes();
  13. }
  14. public String evaluate(String val)
  15. {
  16. ...
  17. if(var1 ==null)
  18. {
  19. Fun1.init();
  20. }
  21. try
  22. {
  23. ...
  24. return output;
  25. } catch (Exception e){
  26. return null;
  27. }
  28. }
  29. }

然后我将该函数注册为hive中的永久函数:

  1. create function fun as 'com.functions.Fun1' using JAR 'maprfs:///data/udf/udf-1.0.5.jar';

而且这个功能在Hive中工作正常,没有任何问题。
我在spark2.2.1(使用scalaapi)上尝试了这个函数,然后开始看到奇怪的行为。在spark shell(连接到同一个配置单元元存储)中,我尝试了以下查询:

  1. sql("select default.fun(field1) from schema.table group by default.fun(field1)").show

效果不错。但是,我在同一个shell中再次尝试了相同的查询,并得到以下错误:

  1. org.apache.spark.sql.AnalysisException: No handler for Hive UDF 'com.functions.Fun1': java.lang.ClassNotFoundException: com.functions.Fun1; line 1 pos 99
  2. at scala.reflect.internal.util.AbstractFileClassLoader.findClass(AbstractFileClassLoader.scala:62)
  3. at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
  4. at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
  5. at org.apache.spark.sql.hive.HiveShim$HiveFunctionWrapper.createFunction(HiveShim.scala:221)

然后我开始了新的sparkshell,用这个udf类导入jar(即。 spark-shell --jars maprfs:///data/udf/udf-1.0.5.jar )
显然,它需要导入这个类,因为这样函数就可以一直正常工作。但奇怪的是,如果没有导入jar,它在我运行查询时第一次工作,但之后就失败了。
你知道为什么会这样吗?
在pyspark中尝试相同的udf时也会遇到不同的问题。我在一些遗嘱执行人身上发现了以下错误。查询最终完成,但是一些执行者抛出了这个错误。

  1. 20/06/09 10:52:22 WARN TaskSetManager: Lost task 51.0 in stage 4.0 (TID 225, server1, executor 41): org.apache.hadoop.hive.ql.metadata.HiveException: Unable to execute method public java.lang.String com.functions.Fun1.evaluate(java.lang.String) on object com.functions.Fun1@52e56022 of class com.functions.Fun1 with arguments {test==:java.lang.String} of size 1
  2. at org.apache.hadoop.hive.ql.exec.FunctionRegistry.invoke(FunctionRegistry.java:981)
  3. at org.apache.spark.sql.hive.HiveSimpleUDF.eval(hiveUDFs.scala:91)
  4. at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
  5. at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:188)
  6. at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.<init>(TungstenAggregationIterator.scala:355)
  7. at org.apache.spark.sql.execution.aggregate.HashAggregateExec$$anonfun$doExecute$1$$anonfun$4.apply(HashAggregateExec.scala:106)
  8. at org.apache.spark.sql.execution.aggregate.HashAggregateExec$$anonfun$doExecute$1$$anonfun$4.apply(HashAggregateExec.scala:97)
  9. at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
  10. at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
  11. at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
  12. at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
  13. at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
  14. at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
  15. at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
  16. at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
  17. at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
  18. at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
  19. at org.apache.spark.scheduler.Task.run(Task.scala:108)
  20. at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
  21. at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  22. at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  23. at java.lang.Thread.run(Thread.java:748)
  24. Caused by: java.lang.reflect.InvocationTargetException
  25. at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  26. at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  27. at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  28. at java.lang.reflect.Method.invoke(Method.java:498)
  29. at org.apache.hadoop.hive.ql.exec.FunctionRegistry.invoke(FunctionRegistry.java:957)
  30. ... 21 more
  31. Caused by: java.lang.NullPointerException
  32. at com.functions.Fun1.init(Fun1.java:33)
  33. at com.functions.Fun1.evaluate(Fun1.java:43)
  34. ... 26 more

我想把jar导入pyspark shell,但没用。我想知道是什么原因造成上述错误。我应该在spark中创建一个临时函数而不是使用在hive中创建的函数吗?在这种情况下,正确的方法是什么?
谢谢

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题