我尝试在java中的每个数据集中进行db调用。但是它总是发生这个错误。
我问题是两件事:
1.这是一种不寻常的db调用方式吗?
1.是否存在其他解决方案?
下面是我尝试过的代码:
public class SparkSql implements Serializable{
public void wordAddress(String word) {
Dataset<Row> recent =sparkSession.read().format("jdbc")
.option("url","jdbc:postgresql://"+ip+":"+port+"/"+db )
.option("driver","org.postgresql.Driver")
.option("query", sql)
.option("user", user)
.option("password", passwd)
.load();
recent.foreach(x->{
String temp = x.get(1).toString();
Dataset<Row> old = this.oldAddress(temp); -->another DB call in the method like above
System.out.println(old.count())
字符串
以下是错误日志:
ERROR] 14:06:53.789 Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.NullPointerException
at org.apache.spark.sql.execution.aggregate.HashAggregateExec.<init>(HashAggregateExec.scala:87)
at org.apache.spark.sql.execution.aggregate.AggUtils$.createAggregate(AggUtils.scala:41)
at org.apache.spark.sql.execution.aggregate.AggUtils$.planAggregateWithoutDistinct(AggUtils.scala:92)
at org.apache.spark.sql.execution.SparkStrategies$Aggregation$.apply(SparkStrategies.scala:419)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:63)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:63)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
型
1条答案
按热度按时间xqkwcwgp1#
您不能从执行器创建新的Dataset或使用SparkContext/SparkSession,它们只存在于驱动程序节点上。
你的foreach被传递给你试图创建另一个数据集的执行器,捕获的函数包括对SparkSession的引用。
要使用Spark跨数据库执行查找,您需要首先创建两个Dataset,然后将它们连接起来。