python-3.x 无法通过按等待时间降序排列I/O绑定协程来提高asyncio.gather速度

rhfm7lfc  于 2023-03-13  发布在  Python


  1. Try to pass coroutines in asyncio.gather() ordered by the expected IO wait time descending. That is, the first argument should be the coroutine with the highest expected IO wait, and so on.




  1. CPU限制任务,
  2. CPU限制任务。


a. I/O绑定任务-用对我在本地运行的Flask服务器的网络调用来替换所述SQL查询,其中在不同API中具有定制休眠。
B. CPU限制任务--带有一个自定义函数,可将随机数写入文件。

**计算机规格:**我在Macbook M1上使用python 3.10.6。如果您需要更多信息,请告诉我


** flask 应用程序/〉由flask run运行**

  1. import asyncio
  2. import time
  3. from flask import Flask
  4. app = Flask(__name__)
  5. @app.route('/delay/1000')
  6. async def delay_1000():
  7. await asyncio.sleep(1)
  8. return 'delay 1000'
  9. @app.route('/delay/3000')
  10. async def delay_3000():
  11. await asyncio.sleep(3)
  12. return 'delay 3000'
  13. @app.route('/delay/5000')
  14. async def delay_5000():
  15. await asyncio.sleep(5)
  16. return 'delay 5000'


  1. import shortuuid
  2. import random
  3. import time
  4. import asyncio
  5. import httpx
  6. def _create_file(file_name: str = None, d=3, n=1000, display_duration: bool = False):
  7. start = time.time()
  8. with open(file_name, 'w') as f:
  9. for _ in range(n):
  10. nums = [str(round(random.uniform(0, 1000), 3)) for _ in range(d)]
  11. f.write(' '.join(nums))
  12. f.write('\n')
  13. if display_duration:
  14. end = time.time()
  15. print(f"Duration for {file_name} is {end - start}")
  16. async def make_req(client, url, name=None):
  17. print(f"{name} is running")
  18. start = time.time()
  19. # 1. CPU task
  20. _create_file(f"./text_files/{shortuuid.uuid()}.txt", d=5,n=500000, display_duration=True)
  21. # 2. I/O bound task (network call)
  22. res = await client.get(url, timeout=10.0)
  23. # 3. CPU task
  24. _create_file(f"./text_files/{shortuuid.uuid()}.txt", d=5,n=500000, display_duration=True)
  25. end = time.time()
  26. print(f"duration of {name} call is {end - start}s")
  27. return res
  28. def generate_coros(client):
  29. one_sec_url = ""
  30. three_sec_url = ""
  31. five_sec_url = ""
  32. one_sec_coro = make_req(client, one_sec_url, name='a')
  33. three_sec_coro = make_req(client, three_sec_url, name='b')
  34. five_sec_coro = make_req(client, five_sec_url, name='c')
  35. return one_sec_coro, three_sec_coro, five_sec_coro
  36. async def all_slow():
  37. async with httpx.AsyncClient() as client:
  38. start = time.time()
  39. query_a, query_b, query_c = generate_coros(client)
  40. await query_a
  41. await query_b
  42. await query_c
  43. end = time.time()
  44. print(f"Duration for not awaiting is {end - start}")
  45. async def slow_then_fast():
  46. async with httpx.AsyncClient() as client:
  47. start = time.time()
  48. query_a, query_b, query_c = generate_coros(client)
  49. results = await asyncio.gather(query_c, query_b, query_a)
  50. end = time.time()
  51. print(f"Duration for slow then fast is {end - start}")
  52. async def fast_then_slow():
  53. async with httpx.AsyncClient() as client:
  54. start = time.time()
  55. query_a, query_b, query_c = generate_coros(client)
  56. results = await asyncio.gather(query_a, query_b, query_c)
  57. end = time.time()
  58. print(f"Duration for fast then slow is {end - start}")
  59. async def main():
  60. await slow_then_fast()
  61. await fast_then_slow()


  1. c is running
  2. Duration for ./text_files/c7ujUaYgZyJUUhx5MEauf4.txt is 1.7627930641174316
  3. b is running
  4. Duration for ./text_files/Pz3jWKBqqUZWKvzqSnRBXj.txt is 1.8750889301300049
  5. a is running
  6. Duration for ./text_files/hwhuJtW8cwh4swLC9u4Ttb.txt is 1.7966642379760742
  7. Duration for ./text_files/RRStJL5DBpB2dzggYJgDGf.txt is 1.7930638790130615
  8. duration of a call is 4.613095998764038s
  9. Duration for ./text_files/gRKwi92jSQkJAkGiumzN9B.txt is 2.0023791790008545
  10. duration of b call is 8.693108081817627s
  11. Duration for ./text_files/FYrVrLbtmYXWjcYyBYVBzu.txt is 1.848686933517456
  12. duration of c call is 12.32311487197876s
  13. Duration for slow then fast is 12.323269128799438
  14. a is running
  15. Duration for ./text_files/KVyHeuCQVQFHNYNHbyM5sc.txt is 1.7585160732269287
  16. b is running
  17. Duration for ./text_files/4FCgKBL2KCFeRTZGCGJyob.txt is 1.7451090812683105
  18. c is running
  19. Duration for ./text_files/i8AVjwhei8GtFGLjAk6Nei.txt is 1.749945878982544
  20. Duration for ./text_files/cUSMAPx6czHb5k9LN3ZHt5.txt is 1.8131372928619385
  21. duration of a call is 8.080067157745361s
  22. Duration for ./text_files/VXVoFPCJQxn9tJtANC7oQL.txt is 1.7849910259246826
  23. duration of b call is 8.291101932525635s
  24. Duration for ./text_files/ge5duApKJQ9825DMaJeXoj.txt is 1.8465838432312012
  25. duration of c call is 8.607009887695312s
  26. Duration for fast then slow is 12.111705780029297




  1. import shortuuid
  2. import asyncio
  3. import time
  4. import psycopg
  5. from flask import Flask
  6. app = Flask(__name__)
  7. def _create_file(file_name: str = None, d=3, n=1000, display_duration: bool = False):
  8. start = time.time()
  9. with open(file_name, 'w') as f:
  10. for _ in range(n):
  11. nums = [str(round(random.uniform(0, 1000), 3)) for _ in range(d)]
  12. f.write(' '.join(nums))
  13. f.write('\n')
  14. if display_duration:
  15. end = time.time()
  16. print(f"Duration for {file_name} is {end - start}")
  17. async def pg_sleep(duration:int):
  18. _create_file(f"./text_files/{shortuuid.uuid()}.txt", d=5,n=300000, display_duration=True)
  19. async with await psycopg.AsyncConnection.connect(
  20. "dbname=postgres user=postgres password=12345 port=50000 host="
  21. ) as aconn:
  22. async with aconn.cursor() as acur:
  23. await acur.execute(
  24. f"SELECT pg_sleep({duration})"
  25. )
  26. _create_file(f"./text_files/{shortuuid.uuid()}.txt", d=5,n=300000, display_duration=True)
  27. @app.route('/fast_to_slow')
  28. async def fast_to_slow():
  29. start = time.time()
  30. await asyncio.gather(pg_sleep(1), pg_sleep(3), pg_sleep(5))
  31. end = time.time()
  32. delay_duration = f'delay {end - start} duration'
  33. print(delay_duration)
  34. return delay_duration
  35. @app.route('/slow_to_fast')
  36. async def slow_to_fast():
  37. start = time.time()
  38. await asyncio.gather(pg_sleep(5), pg_sleep(3), pg_sleep(1))
  39. end = time.time()
  40. delay_duration = f'delay {end - start} duration'
  41. print(delay_duration)
  42. return delay_duration
  43. if __name__ == '__main__':
  44. arg_parser = argparse.ArgumentParser()
  45. arg_parser.add_argument("--port")
  46. port = arg_parser.parse_args().port
  47., port=port)


  1. Duration for ./text_files/SwrTSKNvB6d9gZdxgJGsZU.txt is 1.0756349563598633
  2. Duration for ./text_files/PGjwZmUrGW9wWgbuYMsAih.txt is 1.0807139873504639
  3. Duration for ./text_files/8WmZkViwjbw5cfPQzjuzBS.txt is 1.0467939376831055
  4. Duration for ./text_files/BVgviCNZgPVkGbcHAR8oPG.txt is 1.0906238555908203
  5. Duration for ./text_files/MqSAuH8BSYm2sWLFJ4ueq8.txt is 1.0889880657196045
  6. Duration for ./text_files/USPppWYSjPftv8rZbiwDdV.txt is 1.096479892730713
  7. delay 9.426467895507812 duration
  8. - - [12/Mar/2023 11:08:54] "GET /slow_to_fast HTTP/1.1" 200 -
  9. Duration for ./text_files/6ZS9SL38N5NKFaLdPjpLTN.txt is 1.1848421096801758
  10. Duration for ./text_files/YsXka3m6DELFHXwAc4VSci.txt is 1.0680510997772217
  11. Duration for ./text_files/3dFRbZazhncDC8vhEsjBf3.txt is 1.0647330284118652
  12. Duration for ./text_files/HVfVkUnPLQnAC5V5wq6ACw.txt is 1.0934898853302002
  13. Duration for ./text_files/oHAujuJ5PPqnqqLVS3nuMp.txt is 1.0853149890899658
  14. Duration for ./text_files/hHRuoguqYbVsK6RuGE6HKf.txt is 1.0910439491271973
  15. delay 9.443907022476196 duration
  16. - - [12/Mar/2023 11:09:05] "GET /fast_to_slow HTTP/1.1" 200 -



  1. CPU限制任务,
  2. CPU限制任务。
    快速复习:Asyncio * 专门 * 用于IO绑定任务-这些任务通常强制CPU在等待外部应答的某段时间内处于空闲状态。
  1. import asyncio
  2. import time
  3. import timeit
  4. async def io_bound():
  5. await asyncio.sleep(1)
  6. async def cpu_bound():
  7. time.sleep(1)
  8. async def t1():
  9. await asyncio.gather(cpu_bound(), io_bound(), io_bound())
  10. async def t2():
  11. await asyncio.gather(io_bound(), io_bound(), cpu_bound())
  12. run = timeit.timeit(lambda:, number=10)
  13. print(f"CPU Bound First: {run}")
  14. run = timeit.timeit(lambda:, number=10)
  15. print(f"I/O Bound First: {run}")
  1. CPU Bound First: 20.06762075
  2. I/O Bound First: 10.058394083


