在pyspark中,我有一些操作需要循环并不断更改某些Dataframe。
df = spark.sql("select * from my_table")
for iter in range(0,num_iter):
for v in var_list:
//....
//Operations here that changes df
df = f1()
..
df = join(df, another dataset)
df = f2(df)
..
//There are some action here that should conclude DAG of this loop
我注意到随着循环的进行,性能会下降。最初只需要几秒钟;在几个循环之后,每个迭代都需要几分钟到几个小时。从Yarn上看,我们前进时,dag似乎在增长。在某个时刻,执行器只是失败了,错误显示了一个很长的dag
dag将像这样增长[仅示例,增长dag的不仅仅是连接操作][
]
在这种情况下,有没有办法提高绩效?是什么导致性能下降?如果是对象占用内存,有没有办法在每次循环后刷新它们?
每个循环的cache()(带有后续操作)是否是避免dag累积的好方法?我确实尝试过缓存,但性能仍然很差。
暂无答案!
目前还没有任何答案,快来回答吧!