在Dataframe上使用头(1),导致spark中出现超时异常

yi0zb3m4  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(432)

我正在运行一个简单的spark scala代码:-

val df=spark.read.json("/home/files/data/date_20200811.json")
df.persist
if(!df.head(1).isEmpty){
val validDF=df.where("status=OLD")
validDF.write.json("/home/files/result")
}
else{
println("No data found")
}

当我运行这个代码时,它会给我一个exception:-

java.util.concurrent.TimeoutException: Futures timed out after [300 seconds]
        at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
        at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
        at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
        at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
        at scala.concurrent.Await$.result(package.scala:107)
        at org.apache.spark.sql.execution.joins.BroadcastHashJoin.doExecute(BroadcastHashJoin.scala:107)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
        at org.apache.spark.sql.execution.Project.doExecute(basicOperators.scala:46)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
        at org.apache.spark.sql.execution.columnar.InMemoryRelation.buildBuffers(InMemoryColumnarTableScan.scala:100)
        at org.apache.spark.sql.execution.columnar.InMemoryRelation.<init>(InMemoryColumnarTableScan.scala:89)
        at org.apache.spark.sql.execution.columnar.InMemoryRelation$.apply(InMemoryColumnarTableScan.scala:41)
        at org.apache.spark.sql.execution.CacheManager$$anonfun$cacheQuery$1.apply(CacheManager.scala:93)
        at org.apache.spark.sql.execution.CacheManager.writeLock(CacheManager.scala:60)
        at org.apache.spark.sql.execution.CacheManager.cacheQuery(CacheManager.scala:84)
        at org.apache.spark.sql.DataFrame.persist(DataFrame.scala:1596)

但是如果我用df.count>0替换df.head(1).isempty。它工作得很好

am46iovg

am46iovg1#

可能是巧合,你确定这段代码是错误的罪魁祸首吗?我想少了点什么。
请阅读错误的第7行 stacktrace 是的 at org.apache.spark.sql.execution.joins.BroadcastHashJoin.doExecute(BroadcastHashJoin.scala:107) .
这意味着某个地方有一个Dataframe被广播用于连接。这个广播没有在300秒内完成,这是默认的 spark.sql.broadcastTimeout .

相关问题