python中的异步循环

blmhpbnm  于 2023-02-17  发布在  Python
关注(0)|答案(1)|浏览(175)

我是第一次使用python,我用的是最新版本。
我有一个for循环,执行时间很长,我希望并行运行它以提高性能。
经过一番研究,我认为async.ioasync for是最好的选择,但我还不明白如何使用这种技术转换for循环。
下面是我的代码:

def filter(my_list):
    res = []
    for _ in my_list:
        if check(_): # this takes a while to execute
            res.append(_)
        else:
            print(f'{_} removed')
    return res

如何优化这个程序的执行时间?
程序的其余部分应该保持不变,这意味着调用filter不应该改变,并且应该返回一个过滤后的列表。
谢谢

rks48beu

rks48beu1#

异步

除非修改check()
1.是一个异步函数
1.使用异步库/模块
1.主要与IO结合
你将不会从async获得任何性能。一个有效的async函数的例子。

async def check(item):
    await asyncio.sleep(1)
    return item > 5

如果您有一个异步检查函数,您可以这样做
需要10秒的系列版本

my_list = list(range(10))
res = [item for item in my_list if await check(item)]

与需要1秒的并行版本相比

import asyncio
my_list = list(range(10))
check_tasks = [check(_) for _ in my_list]
checked = await asyncio.gather(*check_tasks)
res = [item for keep, item in zip(checked, my_list) if keep]
print(res)

注意在创建check_tasks列表时,我们没有使用await,这是因为asyncio.gather接受协程。
另外,如果使用time.sleep(1)而不是asyncio.sleep(1),那么串行和并行运行时都是10 s。
如果您想限制在一个时间点执行的最大异步协程数,可以使用asyncio.信号量和modify check()。
例如-如果我们想在给定的时间有两个并行的-

sem = asyncio.Semaphore(2)
async def check(item):
    async with sem:
        await asyncio.sleep(1)
        return item > 5

耗时5秒

多处理版本

检查定义为

import time
def check(item):
    await time.sleep(1)
    return item > 5

串行运行的初始代码将是

my_list = list(range(10))
checked = map(check, my_list)
res = [item for keep, item in zip(checked, my_list) if keep]
print(res)

并行版本将是

from multiprocessing import Pool
my_list = list(range(10))
with Pool(5) as p:
    checked = p.map(check, my_list)
res = [item for keep, item in zip(checked, my_list) if keep]
print(res)

池(5)将在此处启动5个进程。请记住,启动一个进程的开销很大。

相关问题