在pyspark中优化while循环(在循环中持久化或缓存 Dataframe )

ef1yzkbh  于 2023-06-21  发布在  Spark
关注(0)|答案(1)|浏览(146)

我正在编写一个本质上是迭代的算法的PySpark实现。该算法的一部分涉及迭代策略,直到不能进行更多的改进(即,已经贪婪地达到局部最大值)。
函数optimize返回一个三列 Dataframe ,如下所示:
| id|电流值|最佳值|
| - -----|- -----|- -----|
| 0| 1| 1|
| 1| 0| 1|
该函数在while循环中使用,直到current_valuebest_value相同(这意味着不能再进行优化)。

# Init while loop
iterate = True

# Start iterating until optimization yields same result as before
while iterate:

    # Create (or overwrite) `df`
    df = optimizeAll(df2) # Uses `df2` as input
    df.persist().count()

    # Check stopping condition
    iterate = df.where('current_value != best_value').count() > 0

    # Update `df2` with latest results
    if iterate:
        df2 = df2.join(other=df, on='id', how='left') # <- Should I persist this?

当我手动将输入传递给它时,这个函数运行得非常快。然而,我注意到函数运行所需的时间随着迭代次数的增加而呈指数级增长。也就是说,第一次迭代以毫秒为单位运行,第二次迭代以秒为单位运行,最终每次运行需要10分钟。
This question建议,如果df没有缓存,则while循环将在每次迭代时从头开始运行。这是真的吗?
如果是这样,我应该持久化哪些对象?我知道在定义iterate时,持久化df将由count触发。但是,df2没有动作,所以即使我坚持它,它会不会使while循环每次都从头开始?同样,我是否应该在循环中的某个点取消持久化任一表?

afdcj2ne

afdcj2ne1#

如果您的资源足够,dfdf2都应该在您的情况下被持久化。
在上面的例子中,由于df2不是持久化的,当你每次调用df.persist().count()时,由于df2不是持久化的,只剩下沿袭信息,它将在第一次迭代中从第一个df2开始加入,这显然不是一个有效的方法。即使您只能持久化dfdf2,您也应该在示例中首先持久化df2。它应该给予你一个更一致的运行时间。
另外,不确定你使用的是哪个Spark版本,如果你使用的是Spark >= 3.0.0,你应该启用spark.sql.adaptive.enabledspark.sql.adaptive.coalescePartitions.enabled,因为AQE会优化查询计划(https:spark.apache.org/docs/latest/sql-performance-tuning.html#adaptive-query-execution和https://www.databricks.com/blog/2020/05/29/adaptive-query-execution-speeding-up-spark-sql-at-runtime.html)。如果你使用的是Spark < 3.0.0,那么当你想要优化连接和分组策略时,你应该检查日志中的连接策略和数据倾斜情况。

相关问题