为什么增加spark.sql.shuffle.partitions会导致fetchfailedexception

gt0wga4j  于 2021-05-29  发布在  Spark
关注(0)|答案(1)|浏览(1424)

在设置spark.sql.shuffle.partitions=2700时连接到表时遇到fetchfailedexception
但在设置spark.sql.shuffle.partitions=500时运行成功。
据我所知,增加shuffle.partitions会在shuffle read时减少每个任务中的数据。。
我错过什么了吗?
例外情况:

FetchFailed(BlockManagerId(699, nfjd-hadoop02-node120.jpushoa.com, 7337, None), shuffleId=4, mapId=59, reduceId=1140, message=
org.apache.spark.shuffle.FetchFailedException: failed to allocate 16777216 byte(s) of direct memory (used: 2147483648, max: 2147483648)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:554)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:485)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:64)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCode

配置:

spark.executor.cores = 1
spark.dynamicAllocation.maxExecutors = 800
p8ekf7hl

p8ekf7hl1#

在读了shufflefetch的代码之后。
我遇到的问题是,shufflemaptask中的实际块太大,无法一次提取到内存中,如果我的shuffle分区超过2000个(根据spark.shuffle.minnumpartitionstohighlycompress),则driver中的块大小是一个平均块大小,当有倾斜数据时,它将小于实际大小。

相关问题