我如何使用Python的concurrent.futures来将多个进程中的任务排队,每个进程都有自己的线程池?

bvjveswy  于 2023-02-01  发布在  Python
关注(0)|答案(1)|浏览(201)

我正在开发一个库函数,它使用concurrent.futures将网络I/O分布到多个线程上。(大文件),所以我想切换到多个进程。但是,多个进程对于其他一些工作负载也不是很理想(许多小文件)。我想分裂的差异,并有多个进程,每个都有自己的线程池。
问题在于作业排队-concurrent.futures似乎没有设置为多个进程正确排队作业,每个进程可以同时处理多个作业。虽然提前将作业列表分解为块是一种选择,但如果作业在各个线程完成一项任务时异步流向每个进程,工作会顺利得多。
如何使用这个或类似的API有效地将多个进程和线程的作业排队?除了编写自己的执行器之外,还有什么明显的解决方案是我忽略的吗?或者有什么混合进程/线程执行器的现有技术吗?

bf1o4zei

bf1o4zei1#

如果我理解您的意图,您基本上有很多适合多线程的作业,除了一些CPU密集型的工作。因此,您的想法是在多个子进程中创建多个线程池,以便减少GIL争用。当然,在任何给定的子进程中,CPU密集型代码将仅串行执行(假设它是Python字节码),所以它不是一个完美的解决方案。
一种方法是创建一个非常大的多处理池(大于内核数量)。创建进程的数量是有限制的,而且创建进程的成本很高。但是,由于大多数时间它们都在等待I/O完成,因此I/O部分将并发执行。
一个更好的方法是创建一个多处理池,它的执行器可以和其他必需的参数一起传递给多线程池辅助函数。这与您计划做的相反。当辅助函数有CPU密集型工作要执行时,它可以将工作提交给传递多处理池执行器,并阻塞返回的结果。通过这种方式,您可以在给定内核数量的情况下获得最佳并行性。这是我的建议。.例如:

from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed

def cpu_intensive(x):
    return x ** 2

def thread_worker(process_executor, x):
    import time

    # Do something with x
    ...
    time.sleep(.1) # simulate time taken
    future = process_executor.submit(cpu_intensive, x)
    squared = future.result() # Just for demo purposes
    return x, squared

if __name__ == '__main__':
    input_args = (100, 200, 300, 400, 500)
    with ProcessPoolExecutor() as process_executor:
        with ThreadPoolExecutor(10) as thread_executor:
            # Each input results in multiple threading jobs being created:
            futures = [
                thread_executor.submit(thread_worker, process_executor, input_arg + i)
                    for input_arg in input_args
                        for i in range(5)
            ]
            results = [future.result() for future in as_completed(futures)]
    print(results)

图纸:

[(204, 41616), (202, 40804), (203, 41209), (200, 40000), (201, 40401), (104, 10816), (103, 10609), (102, 10404), (101, 10201), (100, 10000), (402, 161604), (303, 91809), (302, 91204), (301, 90601), (400, 160000), (300, 90000), (304, 92416), (403, 162409), (401, 160801), (404, 163216), (500, 250000), (501, 251001), (504, 254016), (503, 253009), (502, 252004)]

但是如果你想坚持你最初的想法,或者由于某种原因,上面的框架不适合你的实际情况,也许下面的方法可以起作用:

from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed
from multiprocessing import Queue
from queue import Empty

def init_pool_processes(q):
    global queue, thread_pool_executor

    queue = q
    thread_pool_executor = ThreadPoolExecutor(10) # or some appropriate pool size

def thread_worker(x):
    import time

    # Do something with x
    ...
    time.sleep(.1) # simulate time taken
    return x # Just for demo purposes

def process_worker(y):
    # This results in some number of threadpool jobs:
    futures = [thread_pool_executor.submit(thread_worker, y + i) for i in range(5)]
    for future in as_completed(futures):
        queue.put(future.result())

if __name__ == '__main__':
    results = []

    def get_results(result):
        try:
            while True:
                result = queue.get_no_wait()
                results.append(result)
        except Empty:
            pass

    input_args = (100, 200, 300, 400, 500)
    queue = Queue()
    with ProcessPoolExecutor(initializer=init_pool_processes, initargs=(queue,)) as executor:
        futures = [executor.submit(process_worker, input_arg) for input_arg in input_args]
        for future in as_completed(futures):
            # Every time a job submitted to the process pool completes we can
            # look for more results:
            try:
                while True:
                    result = queue.get_nowait()
                    results.append(result)
            except Empty:
                pass
    print(results)

图纸:

[102, 201, 101, 203, 103, 202, 200, 100, 104, 204, 504, 301, 404, 502, 304, 403, 302, 501, 503, 500, 402, 303, 401, 300, 400]

相关问题