我在dataproc(spark)中使用pandas udf在gcp上训练许多ml模型。主要思想是,我有一个分组变量,它表示Dataframe中的各种数据集,我运行如下操作:
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def test_train(grp_df):
#train model on grp_df
#evaluate model
#return metrics on
return (metrics)
result=df.groupBy('group_id').apply(test_train)
这可以正常工作,除非我使用非采样数据,其中返回的错误似乎与内存问题有关。这些消息(对我来说)很神秘,但如果我对它运行的数据进行采样,如果我不这样做,它就会失败。错误消息如下:
oserror:在大小为573373864的文件中读取越界(偏移量=631044336,大小=69873416)
或
因超过内存限制而被Yarn杀死的容器。使用24.5 gb的24 gb物理内存。考虑提升spark.yarn.executor.memoryoverhead或禁用yarn.nodemanager.vmem-check-enabled,因为yarn-4714。
我的问题是如何在集群中设置内存以使其工作?
我知道,每一组数据和正在运行的进程都需要完全放在执行器的内存中。我当前有一个4工作者群集,包含以下内容:
如果我认为最大组id中的最大数据大小需要150gb的内存,那么我似乎真的需要每台机器一次操作一个组id。我得到的速度至少是单个worker或vm的4倍。
如果我做了下面的操作,这是不是真的在每台机器上创建了一个执行器,它可以访问减去1到180gb内存的所有内核?因此,如果理论上最大的数据组可以在一个具有这么多ram的vm上工作,那么这个过程应该可以工作吗?
spark = SparkSession.builder \
.appName('test') \
.config('spark.executor.memory', '180g') \
.config('spark.executor.cores', '63') \
.config('spark.executor.instances', '1') \
.getOrCreate()
2条答案
按热度按时间stszievb1#
要解决Yarn开销问题,可以通过添加
.config('spark.yarn.executor.memoryOverhead','30g')
为了获得最大的并行性,建议将核心数保持在5,因为您可以增加执行器的数量。5hcedyr02#
让我们把答案分成三部分:
执行人人数
groupby操作
你的遗嘱执行人记忆
执行人人数
直接从spark文档:
所以,不。你只得到一个执行器,除非启用动态分配,否则它不会扩展。
您可以通过配置
spark.executor.instances
或者通过启用动态执行器分配,根据工作负载设置自动扩展。要启用动态分配,还必须启用shuffle服务,它允许您安全地删除执行器。这可以通过设置两个配置来完成:
spark.shuffle.service.enabled
至true
. 默认值为false。spark.dynamicAllocation.enabled
至true
. 默认值为false。子句
我观察到
group_by
在spark中使用散列聚合完成,这意味着给定x
分区的数目,以及唯一组的值大于x
,多个group by值将位于同一分区中。例如,假设groupu by column中的两个唯一值是
a1
以及a2
总行大小分别为100gib和150gib。如果它们落在不同的分区中,您的应用程序将正常运行,因为每个分区都将放入executor内存(180gib),这是内存处理所必需的,如果它们不放入剩余内存中,则剩余的分区将溢出到磁盘。但是,如果它们落在同一个分区中,您的分区将无法放入执行器内存(180gib<250gib),您将得到一个oom。
在这种情况下,配置
spark.default.parallelism
将数据分布在数量相当多的分区上,或者应用salt或其他技术来消除数据偏斜。如果您的数据不是太倾斜,那么您可以正确地说,只要您的执行器能够处理最大的groupby值,它就应该能够工作,因为您的数据将被均匀地分区,并且发生上述情况的可能性将很小。
另一点需要注意的是,由于您正在使用
group_by
这需要数据洗牌,您还应该打开洗牌服务。如果没有shuffle服务,每个执行者都必须在处理自己的工作的同时处理shuffle请求。执行器存储器
spark中的总执行器内存(实际执行器容器大小)是通过将为容器分配的执行器内存与分配的
memoryOverhead
. 这个memoryOverhead
比如虚拟机开销,内部字符串,其他本地开销等等,基于此,您可以根据您的数据配置适当大小的执行器。所以,当你设置
spark.executor.memory
至180GiB
,实际启动的执行者应该在198GiB
.