我有一个名为some_function(a,b)
的函数,它接受了两个参数a和B,并做了一些密集的事情,然后将一个Spark框架存储为输出。我想在some_function ->上设置超时,如果它花费的时间超过5分钟(300秒),我想中止,并移动到主for循环中的下一个'i'。下面的方法似乎不起作用,some_function(a,b)
似乎永远运行,而不是在5分钟后停止。可能队列有问题?
from multiprocessing import Process, Queue
def some_function(a,b):
//do some intensive process with a and b to generate spark dataframe
//store Spark dataframe output in Q
Q.put(some_spark_df)
def main():
for i in range(0,10):
Q = Queue()
a = //something
b = //something
p = Process(target=some_function, args = (a, b))
p.start()
#timeout value in seconds (300s = 5min)
p.join(300)
#if still running after above time
if p.is_alive():
#terminate
p.terminate()
p.join()
//get output from some_function(a,b)
result_df = Q.get()
1条答案
按热度按时间hfsqlsce1#
主进程无法成功
join
一个子进程,该子进程在将数据从队列中取出之前将数据放入multiprocessing.Queue
示例。主进程将无限期阻塞,因为子进程在队列清空之前无法终止。如果我们可以假设子进程在将结果放入队列时基本上已经完成了它的处理,因此已经终止或即将终止,那么主进程可以通过执行带有 timeout 值的
get
来检查结果是否已经放入队列。如果它能够检索数据,我们知道我们可以安全地join
子进程。否则,我们对子进程调用terminate
,然后我们可以安全地join
子进程,而不管在对Queue.get
和Process.terminate()
的不成功调用之间是否有数据被放入队列(也就是说,不应该有竞态条件)。或者,您可以 * 使用 * 托管 * 队列示例,它不会遇到与
multiprocessing.Queue
示例相同的问题,因为它试图join
一个子进程,该子进程已将数据放置在队列上,但由于队列在另一个进程(调用multiprocessing.Manager()
创建的进程)中退出,因此尚未检索到该数据。但性能可能会受到影响。图纸: