遇到SparkException“无法广播大于8GB的表”

pdtvr36n  于 2023-04-07  发布在  Apache
关注(0)|答案(2)|浏览(815)

我使用Spark 2.2.0进行数据处理。我使用Dataframe.join将2个 Dataframe 连接在一起,但我遇到了以下堆栈跟踪:

18/03/29 11:27:06 INFO YarnAllocator: Driver requested a total number of 0 executor(s).
18/03/29 11:27:09 ERROR FileFormatWriter: Aborting job null.
org.apache.spark.SparkException: Exception thrown in awaitResult: 
    at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
    at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:123)
    at org.apache.spark.sql.execution.InputAdapter.doExecuteBroadcast(WholeStageCodegenExec.scala:248)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
    at org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:126)
    at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.prepareBroadcast(BroadcastHashJoinExec.scala:98)
    at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.codegenInner(BroadcastHashJoinExec.scala:197)
    at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doConsume(BroadcastHashJoinExec.scala:82)
    at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:155)
    ...........
Caused by: org.apache.spark.SparkException: Cannot broadcast the table that is larger than 8GB: 10 GB
    at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1$$anonfun$apply$1.apply(BroadcastExchangeExec.scala:86)
    at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1$$anonfun$apply$1.apply(BroadcastExchangeExec.scala:73)
    at org.apache.spark.sql.execution.SQLExecution$.withExecutionId(SQLExecution.scala:103)
    at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1.apply(BroadcastExchangeExec.scala:72)
    at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1.apply(BroadcastExchangeExec.scala:72)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

我在互联网上搜索这个错误,但没有得到任何提示或解决方案如何解决这个问题。
Spark会自动广播Dataframe作为join的一部分吗?我对这个8GB的限制感到非常惊讶,因为我会认为Dataframe支持“大数据”,8GB根本不是很大。
非常感谢你提前为您的建议在这方面。

yrefmtwq

yrefmtwq1#

经过一番阅读,我尝试禁用自动广播,它似乎工作。更改Spark配置:

'spark.sql.autoBroadcastJoinThreshold': '-1'
qoefvg9y

qoefvg9y2#

目前,spark有一个硬性限制,广播变量的大小应该小于8GB。请看这里。
一般来说,8GB已经足够大了。如果你考虑到你正在运行一个有100个executors的作业,spark driver需要将8GB的数据发送到100个Node,从而产生800GB的网络流量。如果你不广播,而使用简单连接,这个成本会少得多。
如果您确实需要更改autoBroadcast限制,可以使用以下配置

spark.sql.autoBroadcastJoinThreshold: -1

相关问题