import time
from pyspark import SparkContext
sc = SparkContext()
def push_and_pop(rdd):
# two transformations: moves the head element to the tail
first = rdd.first()
return rdd.filter(
lambda obj: obj != first
).union(
sc.parallelize([first])
)
def serialize_and_deserialize(rdd):
# perform a collect() action to evaluate the rdd and create a new instance
return sc.parallelize(rdd.collect())
def do_test(serialize=False):
rdd = sc.parallelize(range(1000))
for i in xrange(25):
t0 = time.time()
rdd = push_and_pop(rdd)
if serialize:
rdd = serialize_and_deserialize(rdd)
print "%.3f" % (time.time() - t0)
do_test()
2条答案
按热度按时间mbyulnm01#
Evaluating Spark DataFrame in loop slows down with every iteration, all work done by controller中的示例代码
qoefvg9y2#
我已经修正了这个问题,每n次将df转换为rdd,然后再转换回df。现在代码运行得很快。但是我不明白到底是什么原因。如果我不做转换,解释计划在迭代过程中似乎上升得很快。这个修正也在《高性能Spark》一书中发布,其中有这个解决方案。
虽然Catalyst优化器非常强大,但它目前遇到的一个挑战是非常大的查询计划。这些查询计划往往是迭代算法的结果,如图算法或机器学习算法。一个简单的解决方法是将数据转换为RDD,然后在每次迭代结束时转换回DataFrame/Dataset