我知道spark使用惰性操作。我的问题是,当我读取csv文件作为spark Dataframe ,我做了一个像下面这样的转换,发生了什么事的数据在内存(RAM)后的行动操作。
df = spark.read.csv('example.csv')
df1 = df.withColumn("Y", df["X"])
df1.show()
在show
操作之后,内存中的任何中间结果(数据)发生了什么?它是否从内存中删除?换句话说,如果我df1.show第二次运行www.example.com(),Spark是否再次读取'example.csv'?
3条答案
按热度按时间bq3bfh9z1#
我认为,用一个说明性的例子可以很好地解释其中的一些概念:
输出为:
表示数据未从文件中重新读取-如果是,则会写入新行(
(c, 3)
)。如果您在调用
df.show()
之前导航到Spark UI(在本地运行时为localhost:4040
),您会注意到列出了一个用于阅读文件的作业沿着相应的DAG。第一次
这表示读入内存的 Dataframe 是一个操作(如中所示,不是延迟求值),并且除非通过另一个
spark.read.csv
调用明确告知,否则不会重新读取该文件。但是,在将 Dataframe 读入内存后执行的后续计算不会被缓存,除非使用
df.cache()
明确告知缓存。例如,如果我们要将以下内容添加到前面的代码片段中:将重复执行相同滤波器的计算,而如果我们添加
cache
调用:计算结果将被保存在内存中。这可以从计划的不同中看出(见下图)--特别是在
cache
的情况下,注意有一个InMemoryTableScan
步骤。第一次
当导航到时,它提供有关已缓存的查询以及首次计算该查询的操作的更多详细信息:
x1c4d 1x指令集
在图像中可能很难看到,但请注意查询信息末尾的“[Cached count at..”,因为
count
是在调用cache
之后调用的第一个操作函数(注意:调用cache
本身并不执行动作-它只是确保当调用诸如count
或show
之类的动作函数时,数据将被缓存以用于后续动作)。h6my8fg22#
内存中是否有任何中间结果(数据)?
你指的是什么中间结果?Spark会自动执行优化分析,如果有一些不必要的中间结果,它会打包一堆操作,而不需要计算出来。例如,在你的代码中,line1和line2直到第3行被执行时才会采取行动。这意味着直到第2行,
df
和df1
是一种“中间结果”,我猜你的意思是。但他们实际上甚至没有被计算过。所以他们根本不在内存中。文件读取操作也没有执行。但是,对于第3行则不同,因为第3行显式检查
df1
的值。然后,无法优化df1
,需要计算其值。并且,df1
将与其值一起存储在内存中。它是否从内存中删除?
由于第3行显式地查看
df1
的值,因此变量df1
将在内存中。Spark是否再次显示为“example.csv”?
不需要。再次调用
df1.show
时,Spark直接从内存中读取值。0kjbasz63#
DAG是基于对 Dataframe 的操作构建的。当遇到操作时,spark引擎执行DAG。在您的情况下,在df.show第二次调用www.example.com()时,spark将再次重新读取“example.csv”,因为 Dataframe 未被持久化/缓存。