如何将复杂的java类对象作为参数传递给spark中的scala udf?

nhaq1z21  于 2021-05-24  发布在  Spark
关注(0)|答案(1)|浏览(507)

我有一个java客户机类(用作 spark-shell )它响应一个api调用-让我们调用 SomeAPIRequester .
在纯java中,它将返回我所需的结果,并提供以下示例代码-

SomeAPIRequester requester = SomeAPIRequester.builder().name("abc").build() // build the class
System.out.println(requester.getSomeItem("id123"))  // result: {"id123": "item123"}

我想通过存储在sparkDataframe(scala)中的id的rdd以分布式方式调用这个api-

val inputIdRdd = sc.parallelize(List("id1", "id2", "id3"...))  // sample RDD of IDs i want to call the API for

我把我的自定义项定义为-

val test: UserDefinedFunction = udf((id: String, requester: SomeAPIRequester) => {
   requester.getSomeItem(id)
})

把这个自定义项称为-

inputIdRdd.toDf("ids").withColumn("apiResult", test(col("ids"), requester)  // requester as built with SomeAPIRequester.builder()....

// or directly with RDD ? udf, or a plain scala function .. 
inputIdRdd.foreach{ id => test(id, requester) }

当我运行一个 .show() 或者 .take() 结果呢,我明白了 NullPointerException 在请求者java类上。
我还试着输入文字( lit ),我读到 typedLit 在scala中,但是我不能转换java Requester 分类到任何允许的 typedLit scala中的类型。
有没有办法通过UDF调用这个java类对象并从api中获得结果?

编辑:

我还尝试初始化rdd的foreach块中的requester类-

inputIdRdd.foreach(x =>{
  val apiRequester = SomeAPIRequester.builder()...(argPool).build()

  try {
    apiRequester.getSomeItem(x)
  } catch {
    case ex: Exception => println(ex.printStackTrace()); ""
  }
})

但这不会返回响应-无法初始化类等。
谢谢!

yws3nbqq

yws3nbqq1#

使用自定义类使用spark需要了解spark如何在引擎盖下工作。大学教师´不要将示例作为参数放入自定义项中。udf中的参数是从dataframe的行中提取的,在这种情况下可以理解空指针异常。您可以尝试以下选项:
首先将示例放在udf的范围内:

val requester: SomeAPIRequester = ???

val test: UserDefinedFunction = udf((id: String) => {
     requester.getSomeItem(id)
})

此时,如果可能的话,您需要将类标记为可序列化,否则将出现notserializableexception。
如果您的类由于来自第三方而不可序列化,您可以将示例标记为lazy transient val,如中所示https://mengdong.github.io/2016/08/16/spark-serialization-memo/ 或者https://medium.com/@swapnesh.chaubal/writing-to-logentries-from-apache-spark-35831282f53d。
如果您在rdd领域工作,那么可以使用mappartitions为每个分区创建一个示例。

相关问题