python 多处理进程池执行器阻塞提交函数

brc7rcf0  于 2023-05-16  发布在  Python
关注(0)|答案(1)|浏览(111)

我试图将executor.submit函数调用转换为阻塞函数,以便它等待ProcessPoolExecutor池有可用的worker。
换句话说,最初它应该打印1、2,然后等待5秒,打印3、4,等等。
我如何才能做到这一点?

import concurrent.futures
import multiprocessing
import time

def wait_f():
    time.sleep(5)
    return 1


if __name__ == '__main__':
    multiprocessing.freeze_support()
    global_results = []
    with concurrent.futures.ProcessPoolExecutor(max_workers=2) as executor:
        futures = []
        for j in range(10):
            future = executor.submit(wait_f)
            futures.append(future)
            print(j)
        for future in concurrent.futures.as_completed(futures):
            result = future.result()
            global_results.append(result)
lsmd5eda

lsmd5eda1#

请记住,工人池模式是为您处理并发而设计的,因此您不必这样做。换句话说,你不应该担心调度比工作者更多的任务,因为池已经为你优化了任务流。
也就是说,你可以从这个答案中构建一个工作解决方案。

from threading import Semaphore
from concurrent.futures import ProcessPoolExecutor

class TaskManager():
    def __init__(self, processes):
        self.workers = Semaphore(processes)
        self.executor = ProcessPoolExecutor(max_workers=processes)

    def new_task(self, function):
        """Start a new task, blocks if queue is full."""
        self.workers.acquire()
        future = self.executor.submit(function)
        future.add_done_callback(self.task_done)

    def task_done(self):
        """Called once task is done, releases the queue if blocked."""
        self.workers.release()

task_manager = TaskManager(2)
task_manager.new_task(wait_f)

相关问题