假设你有这样的东西:
big_table1 = spark.table('db.big_table1').cache()
big_table2 = spark.table('db.big_table2').cache()
big_table2 = spark.table('db.big_table3').cache()
# ... etc
从这些表中,你可以得到一些dfs...
output1 = (
# transformations here: filtering/joining etc the big tables
)
output2 = (
# transformations here: filtering/joining etc the big tables
)
# ... etc
然后,您需要合并所有输出:
final_output = (output1
.union(output2)
# ...etc
)
然后您要将结果保存到表中:
(final_output
.write
.saveAsTable('db.final_output')
)
据我所知,缓存是懒惰的,所以我们需要使用一个操作来强制该高速缓存。但是在上面的过程中,什么时候最好这样做呢?
你会...
final_output.count()
......就在您向表写入之前?
在这种情况下,spark必须遍历整个转换序列,然后将它们合并,然后返回计数。因此,它会说:“啊,你让我缓存big_tables --我会先缓存,然后我会使用内存中的东西来帮助我完成所有这些复杂的转换,并创建你的输出。”
或者它会说:“啊,你让我缓存这些大表。我会做这些大的转换,得到计数,然后我会把所有这些东西放在内存中,以防你再问我。”
换句话说,最好还是...
output1.count()
output2.count()
# ... etc
......甚至......
big_table1.count()
big_table2.count()
# ...etc
......上游,以确保提前缓存所有内容?
或者,在何处强制该高速缓存并不重要,只要它发生在向表写入之前就行了?
1条答案
按热度按时间dw1jzc5e1#
通常情况下,你知道你想在什么上做
.count
操作,所以你不能真的在那里选择。一般来说,尽量避免缓存非常大的 Dataframe /数据集(除非你确实需要),这将填满Spark存储内存(更多信息在这里),并为执行内存留下更少的空间。
因此,在您的示例中,我会查看缓存的内容,而不是计数的内容。
final_output.count()
感兴趣,我将缓存final_output
,而不缓存其他内容。outputX
的计数,我将缓存这些计数(因为您说您在其中进行过滤,并且它们不称为big_tableX
,我希望它们更小big_tableX
表的计数感兴趣,那么您当然需要缓存这些表。希望这对你有帮助!