python 我可以退出“as_completed”池执行器循环(取消所有剩余任务)吗?

ctrmrzij  于 2023-11-16  发布在  Python
关注(0)|答案(1)|浏览(99)

这个想法是运行一组任务,当它们完成时处理它们,一旦返回所需的结果,就取消剩余的任务。我不介意等待已经开始的任务完成,但应该禁止开始新的任务。
我试过调用pool.shutdown(cancel_futures=True),但它似乎不工作在这里:

  1. import time
  2. import multiprocessing
  3. import concurrent.futures
  4. import random
  5. filelist = ['001', '002', '003', '004', '005', '006', '007', '008', '009', '010']
  6. parallel = 2
  7. def worker(name, lock, index):
  8. with lock:
  9. index.value += 1
  10. pc = index.value*10
  11. print(f'starting: {name} ({pc})')
  12. d = random.randrange(1, 5)
  13. time.sleep(d)
  14. return pc == 60
  15. print('INIT')
  16. with multiprocessing.Manager() as manager:
  17. lock = manager.Lock()
  18. index = manager.Value('b', 0)
  19. if parallel > 0:
  20. with concurrent.futures.ProcessPoolExecutor(max_workers=parallel) as pool:
  21. tasks = [pool.submit(worker, filename, lock, index) for filename in filelist]
  22. for task in concurrent.futures.as_completed(tasks):
  23. rc = task.result()
  24. if rc:
  25. print('Found!')
  26. pool.shutdown(cancel_futures=True)
  27. break
  28. else:
  29. for filename in filelist:
  30. rc = worker(filename, lock, index)
  31. if rc:
  32. print('Found!')
  33. break
  34. print('END')

字符串
在串行模式(parallel = 0)下运行时,我得到了所需的结果:

  1. INIT
  2. starting: 001 (10)
  3. starting: 002 (20)
  4. starting: 003 (30)
  5. starting: 004 (40)
  6. starting: 005 (50)
  7. starting: 006 (60)
  8. Found!
  9. END


但当并行启动时,它会一直持续到最后:

  1. INIT
  2. starting: 001 (10)
  3. starting: 002 (20)
  4. starting: 003 (30)
  5. starting: 004 (40)
  6. starting: 005 (50)
  7. starting: 006 (60)
  8. Found!
  9. starting: 007 (70)
  10. starting: 008 (80)
  11. starting: 009 (90)
  12. END


那么,有没有办法退出as_completed循环呢?

wn9m85ua

wn9m85ua1#

是的。你可以用multiprocessing模块中的Pool类来做例子:

  1. from functools import partial
  2. import multiprocessing as mp
  3. import time
  4. def child_process(x, stop_event):
  5. # stopping condition
  6. if x == 4:
  7. print('stop event is set')
  8. stop_event.set()
  9. # do something else
  10. time.sleep(1)
  11. if __name__ == "__main__":
  12. print("main process start")
  13. # Create exit event
  14. exit_event = mp.Manager().Event()
  15. # Create Pool
  16. pool = mp.Pool(2)
  17. print('pool started')
  18. start = time.perf_counter()
  19. pool.map_async(partial(child_process, stop_event=exit_event), range(20))
  20. while 1:
  21. if exit_event.is_set():
  22. # close pool
  23. pool.terminate()
  24. pool.join()
  25. break
  26. print(f'pool interrupted. Time took: {time.perf_counter() - start:.2f} s.')
  27. # do something

字符串
输出:

  1. main process start
  2. pool started
  3. stop event is set
  4. pool interrupted. Time took: 1.17 s.

展开查看全部

相关问题