pyspark-pandasudf on gcp-内存分配

tsm1rwdh  于 2021-05-24  发布在  Spark
关注(0)|答案(2)|浏览(488)

我在dataproc(spark)中使用pandas udf在gcp上训练许多ml模型。主要思想是,我有一个分组变量,它表示Dataframe中的各种数据集,我运行如下操作:

@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def test_train(grp_df):

  #train model on grp_df
  #evaluate model 
  #return metrics on 

    return (metrics)

result=df.groupBy('group_id').apply(test_train)

这可以正常工作,除非我使用非采样数据,其中返回的错误似乎与内存问题有关。这些消息(对我来说)很神秘,但如果我对它运行的数据进行采样,如果我不这样做,它就会失败。错误消息如下:
oserror:在大小为573373864的文件中读取越界(偏移量=631044336,大小=69873416)

因超过内存限制而被Yarn杀死的容器。使用24.5 gb的24 gb物理内存。考虑提升spark.yarn.executor.memoryoverhead或禁用yarn.nodemanager.vmem-check-enabled,因为yarn-4714。
我的问题是如何在集群中设置内存以使其工作?
我知道,每一组数据和正在运行的进程都需要完全放在执行器的内存中。我当前有一个4工作者群集,包含以下内容:

如果我认为最大组id中的最大数据大小需要150gb的内存,那么我似乎真的需要每台机器一次操作一个组id。我得到的速度至少是单个worker或vm的4倍。
如果我做了下面的操作,这是不是真的在每台机器上创建了一个执行器,它可以访问减去1到180gb内存的所有内核?因此,如果理论上最大的数据组可以在一个具有这么多ram的vm上工作,那么这个过程应该可以工作吗?

spark = SparkSession.builder \
  .appName('test') \
  .config('spark.executor.memory', '180g') \
  .config('spark.executor.cores', '63') \
  .config('spark.executor.instances', '1') \
  .getOrCreate()
stszievb

stszievb1#

要解决Yarn开销问题,可以通过添加 .config('spark.yarn.executor.memoryOverhead','30g') 为了获得最大的并行性,建议将核心数保持在5,因为您可以增加执行器的数量。

spark = SparkSession.builder \
  .appName('test') \
  .config('spark.executor.memory', '18g') \
  .config('spark.executor.cores', '5') \
  .config('spark.executor.instances', '12') \
  .getOrCreate()  

# or use dynamic resource allocation refer below config

spark = SparkSession.builder \
    .appName('test') \
   .config('spark.shuffle.service.enabled':'true')\
   .config('spark.dynamicAllocation.enabled':'true')\
   .getOrCreate()
5hcedyr0

5hcedyr02#

让我们把答案分成三部分:
执行人人数
groupby操作
你的遗嘱执行人记忆
执行人人数
直接从spark文档:

spark.executor.instances

 Initial number of executors to run if dynamic allocation is enabled.
 If `--num-executors` (or `spark.executor.instances`) is set and larger
 than this value, it will be used as the initial number of executors.

所以,不。你只得到一个执行器,除非启用动态分配,否则它不会扩展。
您可以通过配置 spark.executor.instances 或者通过启用动态执行器分配,根据工作负载设置自动扩展。
要启用动态分配,还必须启用shuffle服务,它允许您安全地删除执行器。这可以通过设置两个配置来完成: spark.shuffle.service.enabledtrue . 默认值为false。 spark.dynamicAllocation.enabledtrue . 默认值为false。
子句
我观察到 group_by 在spark中使用散列聚合完成,这意味着给定 x 分区的数目,以及唯一组的值大于 x ,多个group by值将位于同一分区中。
例如,假设groupu by column中的两个唯一值是 a1 以及 a2 总行大小分别为100gib和150gib。
如果它们落在不同的分区中,您的应用程序将正常运行,因为每个分区都将放入executor内存(180gib),这是内存处理所必需的,如果它们不放入剩余内存中,则剩余的分区将溢出到磁盘。但是,如果它们落在同一个分区中,您的分区将无法放入执行器内存(180gib<250gib),您将得到一个oom。
在这种情况下,配置 spark.default.parallelism 将数据分布在数量相当多的分区上,或者应用salt或其他技术来消除数据偏斜。
如果您的数据不是太倾斜,那么您可以正确地说,只要您的执行器能够处理最大的groupby值,它就应该能够工作,因为您的数据将被均匀地分区,并且发生上述情况的可能性将很小。
另一点需要注意的是,由于您正在使用 group_by 这需要数据洗牌,您还应该打开洗牌服务。如果没有shuffle服务,每个执行者都必须在处理自己的工作的同时处理shuffle请求。
执行器存储器
spark中的总执行器内存(实际执行器容器大小)是通过将为容器分配的执行器内存与分配的 memoryOverhead . 这个 memoryOverhead 比如虚拟机开销,内部字符串,其他本地开销等等,

Total executor memory = (spark.executor.memory + spark.executor.memoryOverhead)
spark.executor.memoryOverhead = max(executorMemory*0.10, 384 MiB)

基于此,您可以根据您的数据配置适当大小的执行器。所以,当你设置 spark.executor.memory180GiB ,实际启动的执行者应该在 198GiB .

相关问题