我是新的Spark,我试图了解Spark坚持。
当我调用somedatframe.cache()时,整个Dataframe是否没有按原样缓存?例如,如果somedataframe是使用两个表的连接生成的。源表是缓存还是最终的Dataframe?spark是否有某种机制决定在Dataframe中缓存什么这是我面临的问题。。
我有两个Dataframe都是用tables:- test(配置单元表)和src\u数据(临时视图)。我还保存了2个Dataframe,并对它们调用了一个动作来保存在内存中code:-
val new_data= spark.sql("select * from src_data where id not in (select distinct id from test)")
new_data.persist()
new_data.collect
new_data.createOrReplaceTempView("new_data")
val unchanged_data= spark.sql("select * from test where id not in (select id from changed_data)")
unchanged_data.persist()
unchanged_data.collect
unchanged_data.createTempView("unchanged_data")
我还对这两个Dataframe运行show()方法,得到了预期的结果。为了查看数据是否持久化,我截断了测试表并再次对这两个Dataframe运行show()方法。但是对于'new\ u data'df,我只得到相同的结果(在截断表test之前),而对于另一个df,我得到的是空的resultset。。。
我不明白为什么会这样,我还可以看到,在截短之前,我在spark web ui的“存储”中有两个RDD,截短并运行show()之后,我只看到一个。
这是todebugstring如果有用的话。。
unchanged_data.rdd.toDebugString
res2: String =
(4) MapPartitionsRDD[48] at rdd at <console>:29 []
| SQLExecutionRDD[47] at rdd at <console>:29 []
| MapPartitionsRDD[46] at rdd at <console>:29 []
| MapPartitionsRDD[45] at rdd at <console>:29 []
| MapPartitionsRDD[44] at rdd at <console>:29 []
| BroadcastNestedLoopJoin BuildRight, LeftAnti, ((id#197 = id#49) OR isnull((id#197 = id#49)))
:- Scan hive default.test [id#197, value#198, description#199], HiveTableRelation `default`.`test`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [id#197, value#198, description#199]
+- BroadcastExchange IdentityBroadcastMode, [id=#114]
+- InMemoryTableScan [id#49]
+- InMemoryRelation [id#49, value#50, description#51], StorageLevel(disk, memory, deserialized, 1 replicas)
...
scala> new_data.rdd.toDebugString
res3: String =
(2) MapPartitionsRDD[53] at rdd at <console>:29 []
| SQLExecutionRDD[52] at rdd at <console>:29 []
| MapPartitionsRDD[51] at rdd at <console>:29 []
| MapPartitionsRDD[50] at rdd at <console>:29 []
| MapPartitionsRDD[49] at rdd at <console>:29 []
| BroadcastNestedLoopJoin BuildRight, LeftAnti, ((id#49 = id#126) OR isnull((id#49 = id#126)))
:- LocalTableScan [id#49, value#50, description#51]
+- BroadcastExchange IdentityBroadcastMode, [id=#83]
+- *(2) HashAggregate(keys=[id#126], functions=[], output=[id#126])
+- Exchange hashpartitioning(id#126, 200), true, [id=#79]
+- *(1) HashAggregate(keys=[id#126], functions=[], output=[id#126])
+- Scan hive default.test [id#126], HiveTableRelation `default`.`test`, org....
暂无答案!
目前还没有任何答案,快来回答吧!