我想了解更多关于hadoop的持久化策略。当我用磁盘保存Dataframe时,我的数据存储在哪里(路径/文件夹?我在哪里指定这个位置?
3mpgtkmj1#
对于简短的回答,我们可以看看有关 spark.local.dir :用于spark中“scratch”空间的目录,包括存储在磁盘上的Map输出文件和RDD。这应该是在一个快速,本地磁盘在您的系统。它也可以是不同磁盘上多个目录的逗号分隔列表。注意:在spark 1.0及更高版本中,这将被群集管理器设置的spark\u local\u dirs(standalone,mesos)或local\u dirs(yarn)环境变量覆盖。为了更深入的理解,我们可以看看代码:a DataFrame (这只是一个 Dataset[Row] )基于 RDD 它利用了相同的持久性机制。 RDD 我们把这个委托给 SparkContext ,这标志着它的持久性。这个任务实际上是由系统中的几个类来处理的 org.apache.spark.storage Package :一是 BlockManager 只需管理要持久化的数据块以及如何持久化的策略,就可以将实际持久化委托给 DiskStore (当然,当在磁盘上写入时)它表示一个用于写入的高级接口,而这个接口又具有 DiskBlockManager 更低级的操作。希望您现在已经了解了要查看的位置,这样我们就可以继续并了解数据实际持久化的位置,以及我们可以如何配置它: DiskBlockManager 调用帮助程序 Utils.getConfiguredLocalDirs ,出于实用性考虑,我将在这里复制(摘自链接的2.2.1版本,即撰写本文时的最新版本):
spark.local.dir
DataFrame
Dataset[Row]
RDD
SparkContext
org.apache.spark.storage
BlockManager
DiskStore
DiskBlockManager
Utils.getConfiguredLocalDirs
def getConfiguredLocalDirs(conf: SparkConf): Array[String] = { val shuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false) if (isRunningInYarnContainer(conf)) { // If we are in yarn mode, systems can have different disk layouts so we must set it // to what Yarn on this system said was available. Note this assumes that Yarn has // created the directories already, and that they are secured so that only the // user has access to them. getYarnLocalDirs(conf).split(",") } else if (conf.getenv("SPARK_EXECUTOR_DIRS") != null) { conf.getenv("SPARK_EXECUTOR_DIRS").split(File.pathSeparator) } else if (conf.getenv("SPARK_LOCAL_DIRS") != null) { conf.getenv("SPARK_LOCAL_DIRS").split(",") } else if (conf.getenv("MESOS_DIRECTORY") != null && !shuffleServiceEnabled) { // Mesos already creates a directory per Mesos task. Spark should use that directory // instead so all temporary files are automatically cleaned up when the Mesos task ends. // Note that we don't want this if the shuffle service is enabled because we want to // continue to serve shuffle files after the executors that wrote them have already exited. Array(conf.getenv("MESOS_DIRECTORY")) } else { if (conf.getenv("MESOS_DIRECTORY") != null && shuffleServiceEnabled) { logInfo("MESOS_DIRECTORY available but not using provided Mesos sandbox because " + "spark.shuffle.service.enabled is enabled.") } // In non-Yarn mode (or for the driver in yarn-client mode), we cannot trust the user // configuration to point to a secure directory. So create a subdirectory with restricted // permissions under each listed directory. conf.get("spark.local.dir", System.getProperty("java.io.tmpdir")).split(",") }}
def getConfiguredLocalDirs(conf: SparkConf): Array[String] = {
val shuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false)
if (isRunningInYarnContainer(conf)) {
// If we are in yarn mode, systems can have different disk layouts so we must set it
// to what Yarn on this system said was available. Note this assumes that Yarn has
// created the directories already, and that they are secured so that only the
// user has access to them.
getYarnLocalDirs(conf).split(",")
} else if (conf.getenv("SPARK_EXECUTOR_DIRS") != null) {
conf.getenv("SPARK_EXECUTOR_DIRS").split(File.pathSeparator)
} else if (conf.getenv("SPARK_LOCAL_DIRS") != null) {
conf.getenv("SPARK_LOCAL_DIRS").split(",")
} else if (conf.getenv("MESOS_DIRECTORY") != null && !shuffleServiceEnabled) {
// Mesos already creates a directory per Mesos task. Spark should use that directory
// instead so all temporary files are automatically cleaned up when the Mesos task ends.
// Note that we don't want this if the shuffle service is enabled because we want to
// continue to serve shuffle files after the executors that wrote them have already exited.
Array(conf.getenv("MESOS_DIRECTORY"))
} else {
if (conf.getenv("MESOS_DIRECTORY") != null && shuffleServiceEnabled) {
logInfo("MESOS_DIRECTORY available but not using provided Mesos sandbox because " +
"spark.shuffle.service.enabled is enabled.")
}
// In non-Yarn mode (or for the driver in yarn-client mode), we cannot trust the user
// configuration to point to a secure directory. So create a subdirectory with restricted
// permissions under each listed directory.
conf.get("spark.local.dir", System.getProperty("java.io.tmpdir")).split(",")
代码,我相信,是相当不言自明的和注解良好的(并且完全匹配文档的内容):当在yarn上运行时,有一个特定的策略依赖于yarn容器的存储,在mesos中,它或者使用mesos沙盒(除非启用了shuffle服务),在所有其他情况下,它将转到下面设置的位置 spark.local.dir 或者 java.io.tmpdir (很可能是 /tmp/ ).所以,如果你只是在玩数据,很可能是存储在下面 /tmp/ ,否则这在很大程度上取决于您的环境和配置。
java.io.tmpdir
/tmp/
raogr8fs2#
总结一下我的Yarn环境:在@stefanobaghino的指导下,我可以在加载yarn配置的代码中更进一步。
val localDirs = Option(conf.getenv("LOCAL_DIRS")).getOrElse("")
在yarn-default.xml中的yarn.nodemanager.local-dirs选项中设置我的问题的背景是,那是由错误引起的
2018-01-23 16:57:35,229 WARN org.apache.hadoop.yarn.server.nodemanager.DirectoryCollection: Directory /data/1/yarn/local error, used space above threshold of 98.5%, removing from list of valid directories
我的spark作业有时会被终止,我想知道在运行该作业时,这个磁盘是否也用于我的持久数据(这实际上是一个巨大的数量)。因此,在使用磁盘策略持久化数据时,这正是数据所在的文件夹。非常感谢您在这个问题上的指导!
2条答案
按热度按时间3mpgtkmj1#
对于简短的回答,我们可以看看有关
spark.local.dir
:用于spark中“scratch”空间的目录,包括存储在磁盘上的Map输出文件和RDD。这应该是在一个快速,本地磁盘在您的系统。它也可以是不同磁盘上多个目录的逗号分隔列表。注意:在spark 1.0及更高版本中,这将被群集管理器设置的spark\u local\u dirs(standalone,mesos)或local\u dirs(yarn)环境变量覆盖。
为了更深入的理解,我们可以看看代码:a
DataFrame
(这只是一个Dataset[Row]
)基于RDD
它利用了相同的持久性机制。RDD
我们把这个委托给SparkContext
,这标志着它的持久性。这个任务实际上是由系统中的几个类来处理的org.apache.spark.storage
Package :一是BlockManager
只需管理要持久化的数据块以及如何持久化的策略,就可以将实际持久化委托给DiskStore
(当然,当在磁盘上写入时)它表示一个用于写入的高级接口,而这个接口又具有DiskBlockManager
更低级的操作。希望您现在已经了解了要查看的位置,这样我们就可以继续并了解数据实际持久化的位置,以及我们可以如何配置它:
DiskBlockManager
调用帮助程序Utils.getConfiguredLocalDirs
,出于实用性考虑,我将在这里复制(摘自链接的2.2.1版本,即撰写本文时的最新版本):代码,我相信,是相当不言自明的和注解良好的(并且完全匹配文档的内容):当在yarn上运行时,有一个特定的策略依赖于yarn容器的存储,在mesos中,它或者使用mesos沙盒(除非启用了shuffle服务),在所有其他情况下,它将转到下面设置的位置
spark.local.dir
或者java.io.tmpdir
(很可能是/tmp/
).所以,如果你只是在玩数据,很可能是存储在下面
/tmp/
,否则这在很大程度上取决于您的环境和配置。raogr8fs2#
总结一下我的Yarn环境:
在@stefanobaghino的指导下,我可以在加载yarn配置的代码中更进一步。
在yarn-default.xml中的yarn.nodemanager.local-dirs选项中设置
我的问题的背景是,那是由错误引起的
我的spark作业有时会被终止,我想知道在运行该作业时,这个磁盘是否也用于我的持久数据(这实际上是一个巨大的数量)。
因此,在使用磁盘策略持久化数据时,这正是数据所在的文件夹。
非常感谢您在这个问题上的指导!