Apache Spark java中数据集每行的DB调用< row>

1dkrff03  于 2023-11-22  发布在  Apache
关注(0)|答案(1)|浏览(230)

我尝试在java中的每个数据集中进行db调用。但是它总是发生这个错误。
我问题是两件事:
1.这是一种不寻常的db调用方式吗?
1.是否存在其他解决方案?
下面是我尝试过的代码:

public class SparkSql implements Serializable{

     public void wordAddress(String word) {

      Dataset<Row> recent =sparkSession.read().format("jdbc")
                           .option("url","jdbc:postgresql://"+ip+":"+port+"/"+db )
                   .option("driver","org.postgresql.Driver")
                   .option("query", sql)
                   .option("user", user)
               .option("password", passwd)
                   .load();
     recent.foreach(x->{
            String temp = x.get(1).toString();
            Dataset<Row> old = this.oldAddress(temp); -->another DB call in the method like above 
                System.out.println(old.count())

字符串
以下是错误日志:

ERROR] 14:06:53.789 Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.NullPointerException
    at org.apache.spark.sql.execution.aggregate.HashAggregateExec.<init>(HashAggregateExec.scala:87)
    at org.apache.spark.sql.execution.aggregate.AggUtils$.createAggregate(AggUtils.scala:41)
    at org.apache.spark.sql.execution.aggregate.AggUtils$.planAggregateWithoutDistinct(AggUtils.scala:92)
    at org.apache.spark.sql.execution.SparkStrategies$Aggregation$.apply(SparkStrategies.scala:419)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:63)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:63)
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)

xqkwcwgp

xqkwcwgp1#

您不能从执行器创建新的Dataset或使用SparkContext/SparkSession,它们只存在于驱动程序节点上。
你的foreach被传递给你试图创建另一个数据集的执行器,捕获的函数包括对SparkSession的引用。
要使用Spark跨数据库执行查找,您需要首先创建两个Dataset,然后将它们连接起来。

相关问题