如何使Python并发Future在Futures列表完成时发出信号?

ajsxfq5m  于 2023-11-20  发布在  Python
关注(0)|答案(2)|浏览(130)

我有一个函数,它向ThreadPoolExecutor提交几个任务,并返回每个提交所创建的Futures列表:

def submit_tasks() -> [Future]:
    futures = []

    for i in range(10):
        future = executor.submit(Task())
        futures.append(future)

    return futures

def submit() -> Future:
    futures = submit_tasks()
    
    # I would like this function to return a single Future that clients can use to check
    # whether all futures in the list have completed. How to do that?

字符串
Python 3.8
我想让这个函数返回一个Future,客户可以用它来检查列表中的所有期货是否都完成了。如何做到这一点?

disho6za

disho6za1#

  • 我需要完全相同的,并尝试了接受的答案。但它并没有问什么。*

我的解决方案

因为python的标准库不提供这样的功能,我创建了一个小助手类,它可以将多个Future对象合并组合成一个Future。当它完成时,可以使用complete_when参数进行配置。

所属类别:

import concurrent
from concurrent.futures import Future
from typing import Any

class CombinedFuture(Future[Future | None]):
    """
    This class provides "waiting" mechanisms similar to concurrent.futures.wait(...) except that there is no blocking wait.
    This class extends concurrent.futures.Future and thus it can be used like any other Future.
    You can use the .result() and .done() (and other) methods and also use this class with the aforementioned concurrent.futures.wait function.
    
    This class is especially useful when you want to combine multiple futures with and (&&) and or (||) logic.
    Example:
    Consider you have multiple parallel tasks (as futures) and a future that will be completed once your function should return (canellation_future).
    You want to wait until all tasks finish normally or the canellation_future is completed.
    With the standard python library this is not possible because concurrent.futures.wait(...) can either wait for all futures or one.
    Using ALL_COMPLETED will never work. And FIRST_COMPLETED would return also if only one task_futures was completed.
    The following code uses CombinedFuture to solve this problem.
    
    .. code-block:: python
        
        def create_task() -> Future:
            # TODO add logic that completes this future
            return Future()
        
        # can be completed any time
        cancellation_future = Future()
        task_futures = [create_task(), create_task()]
        
        task_combined_future = CombinedFuture(*task_futures, complete_when=concurrent.futures.ALL_COMPLETED)
        done, not_done = concurrent.futures.wait([cancellation_future, task_combined_future], timeout=None, return_when=concurrent.futures.ALL_COMPLETED)
        
        if cancellation_future in done:
            print("cancellation_future was completed")
        else:
            print("task_combined_future was completed")
    """

    def __init__(self, *futures: Future, complete_when : int = concurrent.futures.FIRST_COMPLETED) -> None:
        self.complete_when = complete_when
        self.futures = set(futures)
        self.completed_futures = set()
        
        super().__init__()

        for future in self.futures:
            future.add_done_callback(self._future_completed_callback)
    
    def _set_result_safe(self, result: Any):
        try:
            self.set_result(result)
        except:
            # this might happen when the future had its result already set
            # this can happen when:
            # a second future completes or multiple at "the same time"
            # or the user called set_result or changed the complete_when attribute. both is not supported
            pass

    def _future_completed_callback(self, future: Future) -> None:
        self.completed_futures.add(future)
        
        if self.complete_when == concurrent.futures.FIRST_COMPLETED:
            # no count check required because we only need one and we just added our future
            self._set_result_safe(future)
            return
        elif self.complete_when == concurrent.futures.FIRST_EXCEPTION:
            if future.exception(timeout=0) is not None:
                # future completed with exception
                self._set_result_safe(future)
        # else: should be concurrent.futures.ALL_COMPLETED
        # but we also want this logic in the FIRST_EXCEPTION case
        if self.completed_futures == self.futures:
            self._set_result_safe(None)

字符串

# add import for CombinedFuture class
def submit() -> Future:
    futures = submit_tasks()
    return CombinedFuture(*futures, complete_when=concurrent.futures.ALL_COMPLETED)


这是因为CombinedFutureFuture的子类。

为什么concurrent.futures.wait不工作

这个问题明确地指出,它应该是一个Future,当所有的期货完成时完成。concurrent.futures.wait不返回Future。它只是做一个阻塞等待,直到所有给定的期货都完成。然后它返回一个由两个Future列表组成的元组。
使用方法:

done, not_done = concurrent.futures.wait([future1, future2, future3])
# done will containe all 3 futures (because the return_when argument is by default ALL_COMPLETED)

为什么可能需要获取Future而不是使用concurrent.futures.wait

Future可以传递给其他函数,然后这些函数可以注册完成回调(通过add_done_callback),或者可以轮询Future完成(通过done())。
对于其他原因,请参阅CombinedFuture类的文档。

mznpcxlj

mznpcxlj2#

使用concurrent.futures.wait:https://docs.python.org/3/library/concurrent.futures.html#module-functions

from concurrent.futures import Future, wait

...

def submit() -> Future:
    return wait(submit_tasks())

字符串

相关问题