我正在spark中运行一个迭代循环。每次迭代都会生成一个Dataframe,然后它就成为下一次迭代的输入。程序包含一个在每次迭代中调用的自定义项。在每次迭代之后,我将值赋给一个新的Dataframe,并将旧的Dataframe写入hdfs。新的Dataframe然后成为我下一次迭代的输入。我观察到的问题是,我的udf被多次调用。dag每一个timestep都会变长,udf会被多次调用。我怎样才能防止这种行为。由于性能原因,我不会使用缓存和/或检查点,同时我不能承受多次调用udf的代价。
我正在spark中运行一个迭代循环。每次迭代都会生成一个Dataframe,然后它就成为下一次迭代的输入。程序包含一个在每次迭代中调用的自定义项。在每次迭代之后,我将值赋给一个新的Dataframe,并将旧的Dataframe写入hdfs。新的Dataframe然后成为我下一次迭代的输入。我观察到的问题是,我的udf被多次调用。dag每一个timestep都会变长,udf会被多次调用。我怎样才能防止这种行为。由于性能原因,我不会使用缓存和/或检查点,同时我不能承受多次调用udf的代价。
1条答案
按热度按时间xqkwcwgp1#
我明白你的意思了伙计!
您可以使用:
上面的方法类似于checkpoint,但是它没有物理地保存Dataframe,而是缓存它。
这不会影响你的表现,我也遇到过同样的情况,我观察到你的表现有了很大的提高。
我在sparkml中也使用了这种方法,但问题是你需要注意你的内存空间容量,否则这种方法会起作用。