如何将复杂的java类对象作为参数传递给spark中的scala udf?

nhaq1z21  于 2021-05-24  发布在  Spark
关注(0)|答案(1)|浏览(537)

我有一个java客户机类(用作 spark-shell )它响应一个api调用-让我们调用 SomeAPIRequester .
在纯java中,它将返回我所需的结果,并提供以下示例代码-

  1. SomeAPIRequester requester = SomeAPIRequester.builder().name("abc").build() // build the class
  2. System.out.println(requester.getSomeItem("id123")) // result: {"id123": "item123"}

我想通过存储在sparkDataframe(scala)中的id的rdd以分布式方式调用这个api-

  1. val inputIdRdd = sc.parallelize(List("id1", "id2", "id3"...)) // sample RDD of IDs i want to call the API for

我把我的自定义项定义为-

  1. val test: UserDefinedFunction = udf((id: String, requester: SomeAPIRequester) => {
  2. requester.getSomeItem(id)
  3. })

把这个自定义项称为-

  1. inputIdRdd.toDf("ids").withColumn("apiResult", test(col("ids"), requester) // requester as built with SomeAPIRequester.builder()....
  2. // or directly with RDD ? udf, or a plain scala function ..
  3. inputIdRdd.foreach{ id => test(id, requester) }

当我运行一个 .show() 或者 .take() 结果呢,我明白了 NullPointerException 在请求者java类上。
我还试着输入文字( lit ),我读到 typedLit 在scala中,但是我不能转换java Requester 分类到任何允许的 typedLit scala中的类型。
有没有办法通过UDF调用这个java类对象并从api中获得结果?

编辑:

我还尝试初始化rdd的foreach块中的requester类-

  1. inputIdRdd.foreach(x =>{
  2. val apiRequester = SomeAPIRequester.builder()...(argPool).build()
  3. try {
  4. apiRequester.getSomeItem(x)
  5. } catch {
  6. case ex: Exception => println(ex.printStackTrace()); ""
  7. }
  8. })

但这不会返回响应-无法初始化类等。
谢谢!

yws3nbqq

yws3nbqq1#

使用自定义类使用spark需要了解spark如何在引擎盖下工作。大学教师´不要将示例作为参数放入自定义项中。udf中的参数是从dataframe的行中提取的,在这种情况下可以理解空指针异常。您可以尝试以下选项:
首先将示例放在udf的范围内:

  1. val requester: SomeAPIRequester = ???
  2. val test: UserDefinedFunction = udf((id: String) => {
  3. requester.getSomeItem(id)
  4. })

此时,如果可能的话,您需要将类标记为可序列化,否则将出现notserializableexception。
如果您的类由于来自第三方而不可序列化,您可以将示例标记为lazy transient val,如中所示https://mengdong.github.io/2016/08/16/spark-serialization-memo/ 或者https://medium.com/@swapnesh.chaubal/writing-to-logentries-from-apache-spark-35831282f53d。
如果您在rdd领域工作,那么可以使用mappartitions为每个分区创建一个示例。

相关问题