最近我看到了这篇文章,我们可以通过降低预期的IO等待时间来排序协程,从而加快asyncio.gather(*coroutines)
的速度。
Try to pass coroutines in asyncio.gather() ordered by the expected IO wait time descending. That is, the first argument should be the coroutine with the highest expected IO wait, and so on.
文章链接:https://blog.sentry.io/2022/02/25/how-we-optimized-python-api-server-code-100x/
我正尝试通过以下方式复制此内容:
创建协同程序
- CPU限制任务,
1.然后是I/O绑定任务(网络调用),接着是另一个 - CPU限制任务。
然后,我将使用httpx作为客户端来调用本地Flask服务器,该服务器以不同的时间延迟提供API。
对于I/O和CPU密集型任务,我是:
a. I/O绑定任务-用对我在本地运行的Flask服务器的网络调用来替换所述SQL查询,其中在不同API中具有定制休眠。
B. CPU限制任务--带有一个自定义函数,可将随机数写入文件。
然而,当我通过递增或递减I/O等待时间来对协程进行排序时,我无法发现速度上一致且明显的差异。
**计算机规格:**我在Macbook M1上使用python 3.10.6。如果您需要更多信息,请告诉我
有关更多细节,请参见我用来尝试以下概念的代码:
** flask 应用程序/app.py--〉由flask run
运行**
import asyncio
import time
from flask import Flask
app = Flask(__name__)
@app.route('/delay/1000')
async def delay_1000():
await asyncio.sleep(1)
return 'delay 1000'
@app.route('/delay/3000')
async def delay_3000():
await asyncio.sleep(3)
return 'delay 3000'
@app.route('/delay/5000')
async def delay_5000():
await asyncio.sleep(5)
return 'delay 5000'
test_delay.py文件-我在这里使用httpx作为客户端来调用上面的flask服务器
import shortuuid
import random
import time
import asyncio
import httpx
def _create_file(file_name: str = None, d=3, n=1000, display_duration: bool = False):
start = time.time()
with open(file_name, 'w') as f:
for _ in range(n):
nums = [str(round(random.uniform(0, 1000), 3)) for _ in range(d)]
f.write(' '.join(nums))
f.write('\n')
if display_duration:
end = time.time()
print(f"Duration for {file_name} is {end - start}")
async def make_req(client, url, name=None):
print(f"{name} is running")
start = time.time()
# 1. CPU task
_create_file(f"./text_files/{shortuuid.uuid()}.txt", d=5,n=500000, display_duration=True)
# 2. I/O bound task (network call)
res = await client.get(url, timeout=10.0)
# 3. CPU task
_create_file(f"./text_files/{shortuuid.uuid()}.txt", d=5,n=500000, display_duration=True)
end = time.time()
print(f"duration of {name} call is {end - start}s")
return res
def generate_coros(client):
one_sec_url = "http://127.0.0.1:5000/delay/1000"
three_sec_url = "http://127.0.0.1:5000/delay/3000"
five_sec_url = "http://127.0.0.1:5000/delay/5000"
one_sec_coro = make_req(client, one_sec_url, name='a')
three_sec_coro = make_req(client, three_sec_url, name='b')
five_sec_coro = make_req(client, five_sec_url, name='c')
return one_sec_coro, three_sec_coro, five_sec_coro
async def all_slow():
async with httpx.AsyncClient() as client:
start = time.time()
query_a, query_b, query_c = generate_coros(client)
await query_a
await query_b
await query_c
end = time.time()
print(f"Duration for not awaiting is {end - start}")
async def slow_then_fast():
async with httpx.AsyncClient() as client:
start = time.time()
query_a, query_b, query_c = generate_coros(client)
results = await asyncio.gather(query_c, query_b, query_a)
end = time.time()
print(f"Duration for slow then fast is {end - start}")
async def fast_then_slow():
async with httpx.AsyncClient() as client:
start = time.time()
query_a, query_b, query_c = generate_coros(client)
results = await asyncio.gather(query_a, query_b, query_c)
end = time.time()
print(f"Duration for fast then slow is {end - start}")
async def main():
await slow_then_fast()
await fast_then_slow()
asyncio.run(main())
正如您所看到的,我尝试在slow_then_fast
中将最慢的API调用排在第一位,在fast_then_slow
函数中将最慢的API调用排在第二位,以测试上述理论。
以下是我的结果/日志:
c is running
Duration for ./text_files/c7ujUaYgZyJUUhx5MEauf4.txt is 1.7627930641174316
b is running
Duration for ./text_files/Pz3jWKBqqUZWKvzqSnRBXj.txt is 1.8750889301300049
a is running
Duration for ./text_files/hwhuJtW8cwh4swLC9u4Ttb.txt is 1.7966642379760742
Duration for ./text_files/RRStJL5DBpB2dzggYJgDGf.txt is 1.7930638790130615
duration of a call is 4.613095998764038s
Duration for ./text_files/gRKwi92jSQkJAkGiumzN9B.txt is 2.0023791790008545
duration of b call is 8.693108081817627s
Duration for ./text_files/FYrVrLbtmYXWjcYyBYVBzu.txt is 1.848686933517456
duration of c call is 12.32311487197876s
Duration for slow then fast is 12.323269128799438
a is running
Duration for ./text_files/KVyHeuCQVQFHNYNHbyM5sc.txt is 1.7585160732269287
b is running
Duration for ./text_files/4FCgKBL2KCFeRTZGCGJyob.txt is 1.7451090812683105
c is running
Duration for ./text_files/i8AVjwhei8GtFGLjAk6Nei.txt is 1.749945878982544
Duration for ./text_files/cUSMAPx6czHb5k9LN3ZHt5.txt is 1.8131372928619385
duration of a call is 8.080067157745361s
Duration for ./text_files/VXVoFPCJQxn9tJtANC7oQL.txt is 1.7849910259246826
duration of b call is 8.291101932525635s
Duration for ./text_files/ge5duApKJQ9825DMaJeXoj.txt is 1.8465838432312012
duration of c call is 8.607009887695312s
Duration for fast then slow is 12.111705780029297
正如我们所看到的,12.32s
的slow_then_fast
的持续时间比fast_then_slow
协程12.11s
长。
我不确定我复制这个的方法是否有错,或者这个问题是否在Python/Asyncio的后续版本中得到了解决。由于我对python性能分析还是新手,我感谢任何帮助/建议,我可以在这方面得到/改进。谢谢。
编辑
从那以后,我尝试了另一种更“直接”的方法来实现这篇博客文章所建议的,即简单地将sleep放入一个SQL中,以便在flask应用程序中发布postgres:
import shortuuid
import asyncio
import time
import psycopg
from flask import Flask
app = Flask(__name__)
def _create_file(file_name: str = None, d=3, n=1000, display_duration: bool = False):
start = time.time()
with open(file_name, 'w') as f:
for _ in range(n):
nums = [str(round(random.uniform(0, 1000), 3)) for _ in range(d)]
f.write(' '.join(nums))
f.write('\n')
if display_duration:
end = time.time()
print(f"Duration for {file_name} is {end - start}")
async def pg_sleep(duration:int):
_create_file(f"./text_files/{shortuuid.uuid()}.txt", d=5,n=300000, display_duration=True)
async with await psycopg.AsyncConnection.connect(
"dbname=postgres user=postgres password=12345 port=50000 host=0.0.0.0"
) as aconn:
async with aconn.cursor() as acur:
await acur.execute(
f"SELECT pg_sleep({duration})"
)
_create_file(f"./text_files/{shortuuid.uuid()}.txt", d=5,n=300000, display_duration=True)
@app.route('/fast_to_slow')
async def fast_to_slow():
start = time.time()
await asyncio.gather(pg_sleep(1), pg_sleep(3), pg_sleep(5))
end = time.time()
delay_duration = f'delay {end - start} duration'
print(delay_duration)
return delay_duration
@app.route('/slow_to_fast')
async def slow_to_fast():
start = time.time()
await asyncio.gather(pg_sleep(5), pg_sleep(3), pg_sleep(1))
end = time.time()
delay_duration = f'delay {end - start} duration'
print(delay_duration)
return delay_duration
if __name__ == '__main__':
arg_parser = argparse.ArgumentParser()
arg_parser.add_argument("--port")
port = arg_parser.parse_args().port
app.run(debug=True, port=port)
然而,我也没有看到在墙壁时间方面有很大的差异:
Duration for ./text_files/SwrTSKNvB6d9gZdxgJGsZU.txt is 1.0756349563598633
Duration for ./text_files/PGjwZmUrGW9wWgbuYMsAih.txt is 1.0807139873504639
Duration for ./text_files/8WmZkViwjbw5cfPQzjuzBS.txt is 1.0467939376831055
Duration for ./text_files/BVgviCNZgPVkGbcHAR8oPG.txt is 1.0906238555908203
Duration for ./text_files/MqSAuH8BSYm2sWLFJ4ueq8.txt is 1.0889880657196045
Duration for ./text_files/USPppWYSjPftv8rZbiwDdV.txt is 1.096479892730713
delay 9.426467895507812 duration
127.0.0.1 - - [12/Mar/2023 11:08:54] "GET /slow_to_fast HTTP/1.1" 200 -
Duration for ./text_files/6ZS9SL38N5NKFaLdPjpLTN.txt is 1.1848421096801758
Duration for ./text_files/YsXka3m6DELFHXwAc4VSci.txt is 1.0680510997772217
Duration for ./text_files/3dFRbZazhncDC8vhEsjBf3.txt is 1.0647330284118652
Duration for ./text_files/HVfVkUnPLQnAC5V5wq6ACw.txt is 1.0934898853302002
Duration for ./text_files/oHAujuJ5PPqnqqLVS3nuMp.txt is 1.0853149890899658
Duration for ./text_files/hHRuoguqYbVsK6RuGE6HKf.txt is 1.0910439491271973
delay 9.443907022476196 duration
127.0.0.1 - - [12/Mar/2023 11:09:05] "GET /fast_to_slow HTTP/1.1" 200 -
1条答案
按热度按时间ygya80vv1#
我正尝试通过以下方式复制这一点:创建协同程序
1.然后是I/O绑定任务(网络调用),接着是另一个
快速复习:Asyncio * 专门 * 用于IO绑定任务-这些任务通常强制CPU在等待外部应答的某段时间内处于空闲状态。
常见的例子是网络调用、磁盘检索和排序。在同步代码中,一个简单的web请求涉及到你的CPU向你的硬件网卡发送信息,并在收到回复之前保持空闲。在CPU空闲期间,网卡发送请求,目标服务器接收、处理并回复;只有到那时,你的网卡才会把它送回CPU,从它停止的地方继续。
Asyncio允许您以这样一种方式编写代码:一旦CPU将所需信息发送到硬件网卡,CPU就继续执行任务列表中的下一个任务,定期检查它创建的原始任务是否收到了回复。
记住这一点,有一种非常简单的方法可以在Python代码中模拟同步和异步IO,那就是使用
time.sleep
(用于CPU绑定的任务)和asyncio.sleep
(用于IO绑定的任务),其中一个会阻塞CPU,另一个不会。因此,我们可以编写一个非常简单的测试程序来测试这篇博客文章提出的理论。启动一个IO密集型任务,然后启动一个CPU密集型任务,是否比相反的情况更快?
答案是肯定的。直觉上,这也应该是有意义的。如果您启动一个IO绑定任务,然后切换到执行一个CPU绑定任务,那么在您启动阻塞的CPU绑定任务时,IO绑定任务已经“在路上”了。