我正在使用ThreadPoolExecutor同时运行一个API的多个并发查询。我希望在结果可用时返回结果,并对它们执行一些操作。我发现其中详细介绍了处理ThreadPoolExecutor返回结果的各种方法,这非常有帮助。但是,我的代码似乎仍在等待所有API查询完成,然后才对API的返回执行我请求的后续操作。当我的代码工作时,我觉得我遗漏了一些东西。这是最好的方式,甚至是正确的结构化方式吗?
下面是我的代码示例:
futures_lst = []
with ThreadPoolExecutor(max_workers=6) as executor:
for i in df.index:
future = executor.submit(api_get_function, single_api_input_variable)
futures_lst.append(future)
for future in concurrent.futures.as_completed(futures_lst):
# First run a function to format the results returned by the API
result_df = format_columns_in(future.result())
# Append the formatted results to a csv file as we go
result_df.to_csv('result.csv', mode='a', index=True, header=True)
1条答案
按热度按时间js4nwp541#
你使用
with
语句,这就是为什么你会被阻止,直到所有的期货完成。with
是特殊的语句,是与__enter__
和__exit__
方法。__enter__
将被调用时,你进入with
块,和__exit__
方法将被自动调用(由解释器)与块结束。当你查看
ThreadPoolExecutor
类的源代码时,你会看到这个__exit__
函数(继承自Executor
类):当我们看到关机方法的描述时,它说
如果wait为True,则此方法将不会返回,直到所有挂起的future执行完毕并且与执行器关联的资源已被释放
这就是为什么您的代码“仍在等待所有API查询完成,然后才执行后续操作”。
要解决这个问题,您有一些选择。第一个选择是,您可以删除
with
语句。通过这样做,您将不会被阻止但是上面的选项不是并发的,因为你在期货完成的时候循环,但是如果它们中的两个同时完成呢?
如果要同时(而非并行)执行此操作,还可以使用以下选项:
这种解决方案是并发的,如果任何线程完成了它的工作,那么它将调用
custom_callback
函数但是这个选项不是线程安全的,因为你在追加文件,而我不知道
format_columns_in
在做什么。result_df.to_csv
需要额外的锁定(format_columns_in
也可以根据它的操作要求锁定)。你可以像这样线程安全地执行这个选项;追加到文件也可以是线程安全的,更多细节请查看this
有关
add_done_callback()
方法的更多信息,请参见文档