pyspark spark中间文件存储在磁盘的什么位置?

3ks5zfa0  于 2023-01-08  发布在  Spark
关注(0)|答案(2)|浏览(238)

在shuffle过程中,Map器将输出转储到本地磁盘,Reducer从本地磁盘获取输出。这些文件转储到磁盘的具体位置?我在YARN上运行pyspark cluster。
到目前为止我已经尝试过:
我认为中间文件可能位于的位置是(按可能性降序排列):

  1. hadoop/spark/tmp。根据Yarn定义的LOCAL_DIRS env变量中的documentation。但是,启动集群后(我正在传递master --yarn)我无法使用os.environ找到任何LOCAL_DIRS env变量,但是,我可以看到SPARK_LOCAL_DIRS,根据文档,这应该仅在使用mesos或单机时发生(知道为什么会这样吗?)无论如何,我的SPARK_LOCAL_DIRShadoop/spark/tmp
  2. tmp。默认值为spark.local.dir
  3. /home/username。我尝试在使用--conf spark.local.dir=/home/username启动pyspark时将自定义值发送到spark.local.dir
  4. hadoop/yarn/nm-local-dir。这是yarn-site.xml中yarn.nodemanager.local-dirs属性的值
    我正在运行以下代码,并通过导航到工作节点上的每个位置来检查在上述4个位置创建的任何中间文件。
    我正在运行的代码:
from pyspark import storagelevel
df_sales = spark.read.load("gs://monsoon-credittech.appspot.com/spark_datasets/sales_parquet")
df_products = spark.read.load("gs://monsoon-credittech.appspot.com/spark_datasets/products_parquet")
df_merged = df_sales.join(df_products,df_sales.product_id==df_products.product_id,'inner')
df_merged.persist(storagelevel.StorageLevel.DISK_ONLY)
df_merged.count()

在我上面列出的4个位置中的任何一个都没有正在创建的文件
正如其中一个答案所建议的,我曾尝试通过以下方式获取终端中的目录信息:

  1. At the end of log4j.properties file located at $SPARK_HOME/conf/ add log4j.logger.or.apache.spark.api.python.PythonGatewayServer=INFO This did not help. The following is the screenshot of my terminal with logging set to INFO

spark中间文件(Map器的输出,持久化等)存储在哪里?

iyfamqjs

iyfamqjs1#

不需要深入Spark source的杂草,也许你可以快速查看它的现场。

>>> irdd = spark.sparkContext.range(0,100,1,10)                                                                                                          
>>> def wherearemydirs(p):
...   import os
...   return os.getenv('LOCAL_DIRS')                                                                                                
... 
>>> 
>>> irdd.map(wherearemydirs).collect()
>>>

...将显示终端中的本地目录
/数据/1/Yarn/纳米/用户缓存//应用程序缓存/<application_xxxxxxxxxxx_xxxxxxx>,/数据/10/Yarn/纳米/用户缓存//应用程序缓存/<application_xxxxxxxxxxx_xxxxxxx>,/数据/11/Yarn/纳米/用户缓存//应用程序缓存/<application_xxxxxxxxxxx_xxxxxxx>,...
但是是的,它基本上会指向DiskBlockManager创建的UUID随机化子目录的父目录(由YARN创建),正如@KoedIt提到的:

:
23/01/05 10:15:37 INFO storage.DiskBlockManager: Created local directory at /data/1/yarn/nm/usercache/<your-user-id>/appcache/application_xxxxxxxxx_xxxxxxx/blockmgr-d4df4512-d18b-4dcf-8197-4dfe781b526a
:
1szpjjfi

1szpjjfi2#

这将取决于您的集群设置和您的Spark版本,但您或多或少会看到正确的位置。
对于这个解释,我将谈论Spark v3.3.1。这是截至本文发表时的最新版本。
org.apache.spark.util.Utils中有一个有趣的方法叫做getConfiguredLocalDirs,它看起来像这样:

/**
   * Return the configured local directories where Spark can write files. This
   * method does not create any directories on its own, it only encapsulates the
   * logic of locating the local directories according to deployment mode.
   */
  def getConfiguredLocalDirs(conf: SparkConf): Array[String] = {
    val shuffleServiceEnabled = conf.get(config.SHUFFLE_SERVICE_ENABLED)
    if (isRunningInYarnContainer(conf)) {
      // If we are in yarn mode, systems can have different disk layouts so we must set it
      // to what Yarn on this system said was available. Note this assumes that Yarn has
      // created the directories already, and that they are secured so that only the
      // user has access to them.
      randomizeInPlace(getYarnLocalDirs(conf).split(","))
    } else if (conf.getenv("SPARK_EXECUTOR_DIRS") != null) {
      conf.getenv("SPARK_EXECUTOR_DIRS").split(File.pathSeparator)
    } else if (conf.getenv("SPARK_LOCAL_DIRS") != null) {
      conf.getenv("SPARK_LOCAL_DIRS").split(",")
    } else if (conf.getenv("MESOS_SANDBOX") != null && !shuffleServiceEnabled) {
      // Mesos already creates a directory per Mesos task. Spark should use that directory
      // instead so all temporary files are automatically cleaned up when the Mesos task ends.
      // Note that we don't want this if the shuffle service is enabled because we want to
      // continue to serve shuffle files after the executors that wrote them have already exited.
      Array(conf.getenv("MESOS_SANDBOX"))
    } else {
      if (conf.getenv("MESOS_SANDBOX") != null && shuffleServiceEnabled) {
        logInfo("MESOS_SANDBOX available but not using provided Mesos sandbox because " +
          s"${config.SHUFFLE_SERVICE_ENABLED.key} is enabled.")
      }
      // In non-Yarn mode (or for the driver in yarn-client mode), we cannot trust the user
      // configuration to point to a secure directory. So create a subdirectory with restricted
      // permissions under each listed directory.
      conf.get("spark.local.dir", System.getProperty("java.io.tmpdir")).split(",")
    }
  }

这很有趣,因为它让我们了解了每个配置设置的优先级顺序。

  • 如果在Yarn中运行,getYarnLocalDirs应该提供本地目录,这取决于LOCAL_DIRS环境变量
  • 如果设置了SPARK_EXECUTOR_DIRS,则它将是其中之一
  • 如果设置了SPARK_LOCAL_DIRS,则它将是以下值之一
  • 如果是MESOS_SANDBOX!shuffleServiceEnabled,那么它就是MESOS_SANDBOX
  • 如果设置了spark.local.dir,它将是
  • 否则(包罗万象)将是java.io.tmpdir
    • 重要提示**:如果您使用Kubernetes,所有这些都将被忽略,并使用此逻辑。

现在,我们如何找到此目录?

幸运的是,在DiskBlockManager.createLocalDirs中有一个很好的日志记录行,如果日志记录级别是INFO,它会打印出这个目录。
So, set your default logging level to INFO in log4j.properties ( like so ), restart your spark application and you should be getting a line saying something like
Created local directory at YOUR-DIR-HERE

相关问题