python 连接ThreadPoolExecutor中可用的结果

k2fxgqgv  于 2023-02-21  发布在  Python
关注(0)|答案(1)|浏览(171)

我正在使用ThreadPoolExecutor同时运行一个API的多个并发查询。我希望在结果可用时返回结果,并对它们执行一些操作。我发现其中详细介绍了处理ThreadPoolExecutor返回结果的各种方法,这非常有帮助。但是,我的代码似乎仍在等待所有API查询完成,然后才对API的返回执行我请求的后续操作。当我的代码工作时,我觉得我遗漏了一些东西。这是最好的方式,甚至是正确的结构化方式吗?
下面是我的代码示例:

futures_lst = []
with ThreadPoolExecutor(max_workers=6) as executor:
  for i in df.index:
    future = executor.submit(api_get_function, single_api_input_variable)
    futures_lst.append(future)

for future in concurrent.futures.as_completed(futures_lst):
  # First run a function to format the results returned by the API
  result_df = format_columns_in(future.result())
  # Append the formatted results to a csv file as we go
  result_df.to_csv('result.csv', mode='a', index=True, header=True)
js4nwp54

js4nwp541#

你使用with语句,这就是为什么你会被阻止,直到所有的期货完成。with是特殊的语句,是与__enter____exit__方法。__enter__将被调用时,你进入with块,和__exit__方法将被自动调用(由解释器)与块结束。
当你查看ThreadPoolExecutor类的源代码时,你会看到这个__exit__函数(继承自Executor类):

def __exit__(self, exc_type, exc_val, exc_tb):
    self.shutdown(wait=True)
    return False

当我们看到关机方法的描述时,它说
如果wait为True,则此方法将不会返回,直到所有挂起的future执行完毕并且与执行器关联的资源已被释放
这就是为什么您的代码“仍在等待所有API查询完成,然后才执行后续操作”。
要解决这个问题,您有一些选择。第一个选择是,您可以删除with语句。通过这样做,您将不会被阻止

futures_lst = []
executor = ThreadPoolExecutor(max_workers=6)
for i in df.index:
    future = executor.submit(api_get_function, single_api_input_variable)
    futures_lst.append(future)

for future in concurrent.futures.as_completed(futures_lst):
  # First run a function to format the results returned by the API
  result_df = format_columns_in(future.result())
  # Append the formatted results to a csv file as we go
  result_df.to_csv('result.csv', mode='a', index=True, header=True)

但是上面的选项不是并发的,因为你在期货完成的时候循环,但是如果它们中的两个同时完成呢?
如果要同时(而非并行)执行此操作,还可以使用以下选项:

def custom_callback(future):
    result_df = format_columns_in(future.result())
    # Append the formatted results to a csv file as we go
    result_df.to_csv('result.csv', mode='a', index=True, header=True)

with ThreadPoolExecutor(max_workers=6) as executor:
    for i in df.index:
        future = executor.submit(api_get_function, single_api_input_variable)
        future.add_done_callback(custom_callback)

这种解决方案是并发的,如果任何线程完成了它的工作,那么它将调用custom_callback函数

但是这个选项不是线程安全的,因为你在追加文件,而我不知道format_columns_in在做什么。result_df.to_csv需要额外的锁定(format_columns_in也可以根据它的操作要求锁定)。你可以像这样线程安全地执行这个选项;

追加到文件也可以是线程安全的,更多细节请查看this

def custom_callback(future):
    
    result_df = format_columns_in(future.result())
    # Append the formatted results to a csv file as we go
    with global_lock:
        result_df.to_csv('result.csv', mode='a', index=True, header=True)

with ThreadPoolExecutor(max_workers=6) as executor:
    for i in df.index:
        future = executor.submit(api_get_function, single_api_input_variable)
        future.add_done_callback(custom_callback)

有关add_done_callback()方法的更多信息,请参见文档

相关问题