Spark --在spark中进行动作运算后内存中的数据发生了什么变化?

am46iovg  于 2022-11-01  发布在  Spark
关注(0)|答案(3)|浏览(141)

我知道spark使用惰性操作。我的问题是,当我读取csv文件作为spark Dataframe ,我做了一个像下面这样的转换,发生了什么事的数据在内存(RAM)后的行动操作。

df = spark.read.csv('example.csv')
df1 = df.withColumn("Y", df["X"])
df1.show()

show操作之后,内存中的任何中间结果(数据)发生了什么?它是否从内存中删除?换句话说,如果我df1.show第二次运行www.example.com(),Spark是否再次读取'example.csv'?

bq3bfh9z

bq3bfh9z1#

我认为,用一个说明性的例子可以很好地解释其中的一些概念:

from pyspark.sql import SparkSession
    import pyspark.sql.functions as F
    spark = SparkSession\
        .builder\
        .appName("test")\
        .getOrCreate()
    file_name = './test.csv'
    with open(file_name, 'w') as fp:
        fp.write('foo, bar')
        fp.write('\na, 1')
        fp.write('\nb, 2')
    df = spark.read.csv(file_name, header=True)
    df = df.withColumn('baz', F.lit('test'))
    df.show()
    with open(file_name, 'a') as fp:
        fp.write('\nc, 3')
    df.show()

输出为:

+---+----+----+
|foo| bar| baz|
+---+----+----+
|  a|   1|test|
|  b|   2|test|
+---+----+----+

+---+----+----+
|foo| bar| baz|
+---+----+----+
|  a|   1|test|
|  b|   2|test|
+---+----+----+

表示数据未从文件中重新读取-如果是,则会写入新行((c, 3))。
如果您在调用df.show()之前导航到Spark UI(在本地运行时为localhost:4040),您会注意到列出了一个用于阅读文件的作业沿着相应的DAG。
第一次
这表示读入内存的 Dataframe 是一个操作(如中所示,不是延迟求值),并且除非通过另一个spark.read.csv调用明确告知,否则不会重新读取该文件。
但是,在将 Dataframe 读入内存后执行的后续计算不会被缓存,除非使用df.cache()明确告知缓存。例如,如果我们要将以下内容添加到前面的代码片段中:

df.filter(F.col('foo') == 'a').count()
    df.filter(F.col('foo') == 'a').show()

将重复执行相同滤波器的计算,而如果我们添加cache调用:

df.filter(F.col('foo') == 'a').cache()
    df.filter(F.col('foo') == 'a').count()
    df.filter(F.col('foo') == 'a').show()

计算结果将被保存在内存中。这可以从计划的不同中看出(见下图)--特别是在cache的情况下,注意有一个InMemoryTableScan步骤。
第一次
当导航到时,它提供有关已缓存的查询以及首次计算该查询的操作的更多详细信息:
x1c4d 1x指令集
在图像中可能很难看到,但请注意查询信息末尾的“[Cached count at..”,因为count是在调用cache之后调用的第一个操作函数(注意:调用cache本身并不执行动作-它只是确保当调用诸如countshow之类的动作函数时,数据将被缓存以用于后续动作)。

h6my8fg2

h6my8fg22#

内存中是否有任何中间结果(数据)?
你指的是什么中间结果?Spark会自动执行优化分析,如果有一些不必要的中间结果,它会打包一堆操作,而不需要计算出来。例如,在你的代码中,line1和line2直到第3行被执行时才会采取行动。这意味着直到第2行,dfdf1是一种“中间结果”,我猜你的意思是。但他们实际上甚至没有被计算过。所以他们根本不在内存中。文件读取操作也没有执行。
但是,对于第3行则不同,因为第3行显式检查df1的值。然后,无法优化df1,需要计算其值。并且,df1将与其值一起存储在内存中。
它是否从内存中删除?
由于第3行显式地查看df1的值,因此变量df1将在内存中。
Spark是否再次显示为“example.csv”?
不需要。再次调用df1.show时,Spark直接从内存中读取值。

0kjbasz6

0kjbasz63#

DAG是基于对 Dataframe 的操作构建的。当遇到操作时,spark引擎执行DAG。在您的情况下,在df.show第二次调用www.example.com()时,spark将再次重新读取“example.csv”,因为 Dataframe 未被持久化/缓存。

相关问题