Pyspark -循环n次-每次循环逐渐变慢

kh212irz  于 2022-11-01  发布在  Spark
关注(0)|答案(2)|浏览(248)

因此,基本上我想在dataframe中循环n次,并在每个循环中应用一个函数(执行一个连接)。我的测试dataframe就像1000行,在每次迭代中,正好添加一列。前三个循环立即执行,从那时起,它变得非常非常慢。例如,第10个循环需要10多分钟。
我不明白为什么会发生这种情况,因为我的Dataframe不会以行的形式变大。例如,如果我用n=20调用函数,join会立即执行。但是当我迭代20次时,它很快就卡住了。
你知道什么可能导致这个问题吗?

mbyulnm0

mbyulnm01#

Evaluating Spark DataFrame in loop slows down with every iteration, all work done by controller中的示例代码

import time
from pyspark import SparkContext

sc = SparkContext()

def push_and_pop(rdd):
    # two transformations: moves the head element to the tail
    first = rdd.first()
    return rdd.filter(
        lambda obj: obj != first
    ).union(
        sc.parallelize([first])
    )

def serialize_and_deserialize(rdd):
    # perform a collect() action to evaluate the rdd and create a new instance
    return sc.parallelize(rdd.collect())

def do_test(serialize=False):
    rdd = sc.parallelize(range(1000))
    for i in xrange(25):
        t0 = time.time()
        rdd = push_and_pop(rdd)
        if serialize:
            rdd = serialize_and_deserialize(rdd)
        print "%.3f" % (time.time() - t0)

do_test()
qoefvg9y

qoefvg9y2#

我已经修正了这个问题,每n次将df转换为rdd,然后再转换回df。现在代码运行得很快。但是我不明白到底是什么原因。如果我不做转换,解释计划在迭代过程中似乎上升得很快。这个修正也在《高性能Spark》一书中发布,其中有这个解决方案。
虽然Catalyst优化器非常强大,但它目前遇到的一个挑战是非常大的查询计划。这些查询计划往往是迭代算法的结果,如图算法或机器学习算法。一个简单的解决方法是将数据转换为RDD,然后在每次迭代结束时转换回DataFrame/Dataset

相关问题