我有一个java客户机类(用作 spark-shell
)它响应一个api调用-让我们调用 SomeAPIRequester
.
在纯java中,它将返回我所需的结果,并提供以下示例代码-
SomeAPIRequester requester = SomeAPIRequester.builder().name("abc").build() // build the class
System.out.println(requester.getSomeItem("id123")) // result: {"id123": "item123"}
我想通过存储在sparkDataframe(scala)中的id的rdd以分布式方式调用这个api-
val inputIdRdd = sc.parallelize(List("id1", "id2", "id3"...)) // sample RDD of IDs i want to call the API for
我把我的自定义项定义为-
val test: UserDefinedFunction = udf((id: String, requester: SomeAPIRequester) => {
requester.getSomeItem(id)
})
把这个自定义项称为-
inputIdRdd.toDf("ids").withColumn("apiResult", test(col("ids"), requester) // requester as built with SomeAPIRequester.builder()....
// or directly with RDD ? udf, or a plain scala function ..
inputIdRdd.foreach{ id => test(id, requester) }
当我运行一个 .show()
或者 .take()
结果呢,我明白了 NullPointerException
在请求者java类上。
我还试着输入文字( lit
),我读到 typedLit
在scala中,但是我不能转换java Requester
分类到任何允许的 typedLit
scala中的类型。
有没有办法通过UDF调用这个java类对象并从api中获得结果?
编辑:
我还尝试初始化rdd的foreach块中的requester类-
inputIdRdd.foreach(x =>{
val apiRequester = SomeAPIRequester.builder()...(argPool).build()
try {
apiRequester.getSomeItem(x)
} catch {
case ex: Exception => println(ex.printStackTrace()); ""
}
})
但这不会返回响应-无法初始化类等。
谢谢!
1条答案
按热度按时间yws3nbqq1#
使用自定义类使用spark需要了解spark如何在引擎盖下工作。大学教师´不要将示例作为参数放入自定义项中。udf中的参数是从dataframe的行中提取的,在这种情况下可以理解空指针异常。您可以尝试以下选项:
首先将示例放在udf的范围内:
此时,如果可能的话,您需要将类标记为可序列化,否则将出现notserializableexception。
如果您的类由于来自第三方而不可序列化,您可以将示例标记为lazy transient val,如中所示https://mengdong.github.io/2016/08/16/spark-serialization-memo/ 或者https://medium.com/@swapnesh.chaubal/writing-to-logentries-from-apache-spark-35831282f53d。
如果您在rdd领域工作,那么可以使用mappartitions为每个分区创建一个示例。