python-3.x 无法通过按等待时间降序排列I/O绑定协程来提高asyncio.gather速度

rhfm7lfc  于 2023-03-13  发布在  Python
关注(0)|答案(1)|浏览(109)

最近我看到了这篇文章,我们可以通过降低预期的IO等待时间来排序协程,从而加快asyncio.gather(*coroutines)的速度。

  1. Try to pass coroutines in asyncio.gather() ordered by the expected IO wait time descending. That is, the first argument should be the coroutine with the highest expected IO wait, and so on.

文章链接:https://blog.sentry.io/2022/02/25/how-we-optimized-python-api-server-code-100x/

我正尝试通过以下方式复制此内容:

创建协同程序

  1. CPU限制任务,
    1.然后是I/O绑定任务(网络调用),接着是另一个
  2. CPU限制任务。
    然后,我将使用httpx作为客户端来调用本地Flask服务器,该服务器以不同的时间延迟提供API。

对于I/O和CPU密集型任务,我是:

a. I/O绑定任务-用对我在本地运行的Flask服务器的网络调用来替换所述SQL查询,其中在不同API中具有定制休眠。
B. CPU限制任务--带有一个自定义函数,可将随机数写入文件。
然而,当我通过递增或递减I/O等待时间来对协程进行排序时,我无法发现速度上一致且明显的差异。

**计算机规格:**我在Macbook M1上使用python 3.10.6。如果您需要更多信息,请告诉我

有关更多细节,请参见我用来尝试以下概念的代码:

** flask 应用程序/app.py--〉由flask run运行**

  1. import asyncio
  2. import time
  3. from flask import Flask
  4. app = Flask(__name__)
  5. @app.route('/delay/1000')
  6. async def delay_1000():
  7. await asyncio.sleep(1)
  8. return 'delay 1000'
  9. @app.route('/delay/3000')
  10. async def delay_3000():
  11. await asyncio.sleep(3)
  12. return 'delay 3000'
  13. @app.route('/delay/5000')
  14. async def delay_5000():
  15. await asyncio.sleep(5)
  16. return 'delay 5000'

test_delay.py文件-我在这里使用httpx作为客户端来调用上面的flask服务器

  1. import shortuuid
  2. import random
  3. import time
  4. import asyncio
  5. import httpx
  6. def _create_file(file_name: str = None, d=3, n=1000, display_duration: bool = False):
  7. start = time.time()
  8. with open(file_name, 'w') as f:
  9. for _ in range(n):
  10. nums = [str(round(random.uniform(0, 1000), 3)) for _ in range(d)]
  11. f.write(' '.join(nums))
  12. f.write('\n')
  13. if display_duration:
  14. end = time.time()
  15. print(f"Duration for {file_name} is {end - start}")
  16. async def make_req(client, url, name=None):
  17. print(f"{name} is running")
  18. start = time.time()
  19. # 1. CPU task
  20. _create_file(f"./text_files/{shortuuid.uuid()}.txt", d=5,n=500000, display_duration=True)
  21. # 2. I/O bound task (network call)
  22. res = await client.get(url, timeout=10.0)
  23. # 3. CPU task
  24. _create_file(f"./text_files/{shortuuid.uuid()}.txt", d=5,n=500000, display_duration=True)
  25. end = time.time()
  26. print(f"duration of {name} call is {end - start}s")
  27. return res
  28. def generate_coros(client):
  29. one_sec_url = "http://127.0.0.1:5000/delay/1000"
  30. three_sec_url = "http://127.0.0.1:5000/delay/3000"
  31. five_sec_url = "http://127.0.0.1:5000/delay/5000"
  32. one_sec_coro = make_req(client, one_sec_url, name='a')
  33. three_sec_coro = make_req(client, three_sec_url, name='b')
  34. five_sec_coro = make_req(client, five_sec_url, name='c')
  35. return one_sec_coro, three_sec_coro, five_sec_coro
  36. async def all_slow():
  37. async with httpx.AsyncClient() as client:
  38. start = time.time()
  39. query_a, query_b, query_c = generate_coros(client)
  40. await query_a
  41. await query_b
  42. await query_c
  43. end = time.time()
  44. print(f"Duration for not awaiting is {end - start}")
  45. async def slow_then_fast():
  46. async with httpx.AsyncClient() as client:
  47. start = time.time()
  48. query_a, query_b, query_c = generate_coros(client)
  49. results = await asyncio.gather(query_c, query_b, query_a)
  50. end = time.time()
  51. print(f"Duration for slow then fast is {end - start}")
  52. async def fast_then_slow():
  53. async with httpx.AsyncClient() as client:
  54. start = time.time()
  55. query_a, query_b, query_c = generate_coros(client)
  56. results = await asyncio.gather(query_a, query_b, query_c)
  57. end = time.time()
  58. print(f"Duration for fast then slow is {end - start}")
  59. async def main():
  60. await slow_then_fast()
  61. await fast_then_slow()
  62. asyncio.run(main())

正如您所看到的,我尝试在slow_then_fast中将最慢的API调用排在第一位,在fast_then_slow函数中将最慢的API调用排在第二位,以测试上述理论。
以下是我的结果/日志:

  1. c is running
  2. Duration for ./text_files/c7ujUaYgZyJUUhx5MEauf4.txt is 1.7627930641174316
  3. b is running
  4. Duration for ./text_files/Pz3jWKBqqUZWKvzqSnRBXj.txt is 1.8750889301300049
  5. a is running
  6. Duration for ./text_files/hwhuJtW8cwh4swLC9u4Ttb.txt is 1.7966642379760742
  7. Duration for ./text_files/RRStJL5DBpB2dzggYJgDGf.txt is 1.7930638790130615
  8. duration of a call is 4.613095998764038s
  9. Duration for ./text_files/gRKwi92jSQkJAkGiumzN9B.txt is 2.0023791790008545
  10. duration of b call is 8.693108081817627s
  11. Duration for ./text_files/FYrVrLbtmYXWjcYyBYVBzu.txt is 1.848686933517456
  12. duration of c call is 12.32311487197876s
  13. Duration for slow then fast is 12.323269128799438
  14. a is running
  15. Duration for ./text_files/KVyHeuCQVQFHNYNHbyM5sc.txt is 1.7585160732269287
  16. b is running
  17. Duration for ./text_files/4FCgKBL2KCFeRTZGCGJyob.txt is 1.7451090812683105
  18. c is running
  19. Duration for ./text_files/i8AVjwhei8GtFGLjAk6Nei.txt is 1.749945878982544
  20. Duration for ./text_files/cUSMAPx6czHb5k9LN3ZHt5.txt is 1.8131372928619385
  21. duration of a call is 8.080067157745361s
  22. Duration for ./text_files/VXVoFPCJQxn9tJtANC7oQL.txt is 1.7849910259246826
  23. duration of b call is 8.291101932525635s
  24. Duration for ./text_files/ge5duApKJQ9825DMaJeXoj.txt is 1.8465838432312012
  25. duration of c call is 8.607009887695312s
  26. Duration for fast then slow is 12.111705780029297

正如我们所看到的,12.32sslow_then_fast的持续时间比fast_then_slow协程12.11s长。
我不确定我复制这个的方法是否有错,或者这个问题是否在Python/Asyncio的后续版本中得到了解决。由于我对python性能分析还是新手,我感谢任何帮助/建议,我可以在这方面得到/改进。谢谢。

编辑

从那以后,我尝试了另一种更“直接”的方法来实现这篇博客文章所建议的,即简单地将sleep放入一个SQL中,以便在flask应用程序中发布postgres:

  1. import shortuuid
  2. import asyncio
  3. import time
  4. import psycopg
  5. from flask import Flask
  6. app = Flask(__name__)
  7. def _create_file(file_name: str = None, d=3, n=1000, display_duration: bool = False):
  8. start = time.time()
  9. with open(file_name, 'w') as f:
  10. for _ in range(n):
  11. nums = [str(round(random.uniform(0, 1000), 3)) for _ in range(d)]
  12. f.write(' '.join(nums))
  13. f.write('\n')
  14. if display_duration:
  15. end = time.time()
  16. print(f"Duration for {file_name} is {end - start}")
  17. async def pg_sleep(duration:int):
  18. _create_file(f"./text_files/{shortuuid.uuid()}.txt", d=5,n=300000, display_duration=True)
  19. async with await psycopg.AsyncConnection.connect(
  20. "dbname=postgres user=postgres password=12345 port=50000 host=0.0.0.0"
  21. ) as aconn:
  22. async with aconn.cursor() as acur:
  23. await acur.execute(
  24. f"SELECT pg_sleep({duration})"
  25. )
  26. _create_file(f"./text_files/{shortuuid.uuid()}.txt", d=5,n=300000, display_duration=True)
  27. @app.route('/fast_to_slow')
  28. async def fast_to_slow():
  29. start = time.time()
  30. await asyncio.gather(pg_sleep(1), pg_sleep(3), pg_sleep(5))
  31. end = time.time()
  32. delay_duration = f'delay {end - start} duration'
  33. print(delay_duration)
  34. return delay_duration
  35. @app.route('/slow_to_fast')
  36. async def slow_to_fast():
  37. start = time.time()
  38. await asyncio.gather(pg_sleep(5), pg_sleep(3), pg_sleep(1))
  39. end = time.time()
  40. delay_duration = f'delay {end - start} duration'
  41. print(delay_duration)
  42. return delay_duration
  43. if __name__ == '__main__':
  44. arg_parser = argparse.ArgumentParser()
  45. arg_parser.add_argument("--port")
  46. port = arg_parser.parse_args().port
  47. app.run(debug=True, port=port)

然而,我也没有看到在墙壁时间方面有很大的差异:

  1. Duration for ./text_files/SwrTSKNvB6d9gZdxgJGsZU.txt is 1.0756349563598633
  2. Duration for ./text_files/PGjwZmUrGW9wWgbuYMsAih.txt is 1.0807139873504639
  3. Duration for ./text_files/8WmZkViwjbw5cfPQzjuzBS.txt is 1.0467939376831055
  4. Duration for ./text_files/BVgviCNZgPVkGbcHAR8oPG.txt is 1.0906238555908203
  5. Duration for ./text_files/MqSAuH8BSYm2sWLFJ4ueq8.txt is 1.0889880657196045
  6. Duration for ./text_files/USPppWYSjPftv8rZbiwDdV.txt is 1.096479892730713
  7. delay 9.426467895507812 duration
  8. 127.0.0.1 - - [12/Mar/2023 11:08:54] "GET /slow_to_fast HTTP/1.1" 200 -
  9. Duration for ./text_files/6ZS9SL38N5NKFaLdPjpLTN.txt is 1.1848421096801758
  10. Duration for ./text_files/YsXka3m6DELFHXwAc4VSci.txt is 1.0680510997772217
  11. Duration for ./text_files/3dFRbZazhncDC8vhEsjBf3.txt is 1.0647330284118652
  12. Duration for ./text_files/HVfVkUnPLQnAC5V5wq6ACw.txt is 1.0934898853302002
  13. Duration for ./text_files/oHAujuJ5PPqnqqLVS3nuMp.txt is 1.0853149890899658
  14. Duration for ./text_files/hHRuoguqYbVsK6RuGE6HKf.txt is 1.0910439491271973
  15. delay 9.443907022476196 duration
  16. 127.0.0.1 - - [12/Mar/2023 11:09:05] "GET /fast_to_slow HTTP/1.1" 200 -
ygya80vv

ygya80vv1#

我正尝试通过以下方式复制这一点:创建协同程序

  1. CPU限制任务,
    1.然后是I/O绑定任务(网络调用),接着是另一个
  2. CPU限制任务。
    快速复习:Asyncio * 专门 * 用于IO绑定任务-这些任务通常强制CPU在等待外部应答的某段时间内处于空闲状态。
    常见的例子是网络调用、磁盘检索和排序。在同步代码中,一个简单的web请求涉及到你的CPU向你的硬件网卡发送信息,并在收到回复之前保持空闲。在CPU空闲期间,网卡发送请求,目标服务器接收、处理并回复;只有到那时,你的网卡才会把它送回CPU,从它停止的地方继续。
    Asyncio允许您以这样一种方式编写代码:一旦CPU将所需信息发送到硬件网卡,CPU就继续执行任务列表中的下一个任务,定期检查它创建的原始任务是否收到了回复。
    记住这一点,有一种非常简单的方法可以在Python代码中模拟同步和异步IO,那就是使用time.sleep(用于CPU绑定的任务)和asyncio.sleep(用于IO绑定的任务),其中一个会阻塞CPU,另一个不会。
    因此,我们可以编写一个非常简单的测试程序来测试这篇博客文章提出的理论。启动一个IO密集型任务,然后启动一个CPU密集型任务,是否比相反的情况更快?
  1. import asyncio
  2. import time
  3. import timeit
  4. async def io_bound():
  5. await asyncio.sleep(1)
  6. async def cpu_bound():
  7. time.sleep(1)
  8. async def t1():
  9. await asyncio.gather(cpu_bound(), io_bound(), io_bound())
  10. async def t2():
  11. await asyncio.gather(io_bound(), io_bound(), cpu_bound())
  12. run = timeit.timeit(lambda: asyncio.run(t1()), number=10)
  13. print(f"CPU Bound First: {run}")
  14. run = timeit.timeit(lambda: asyncio.run(t2()), number=10)
  15. print(f"I/O Bound First: {run}")
  1. CPU Bound First: 20.06762075
  2. I/O Bound First: 10.058394083

答案是肯定的。直觉上,这也应该是有意义的。如果您启动一个IO绑定任务,然后切换到执行一个CPU绑定任务,那么在您启动阻塞的CPU绑定任务时,IO绑定任务已经“在路上”了。

展开查看全部

相关问题