python中与postgresql并行的select

u0sqgete  于 2021-08-25  发布在  Java
关注(0)|答案(1)|浏览(512)

嗨,我们试图通过将一个巨大的选择切碎为较小的选择来并行化它。数据集有一个“段”列,因此我们使用它作为划分select的方法。我们的目标是一个posgresql数据库。不幸的是,我们没有观察到性能优势,换句话说,性能的提高与我们使用的线程成线性关系。
我们能够将观察结果分离到一个合成测试用例中。我们将多个回迁(11)模拟为每个回迁来自generate_series查询。
我们使用1个连接,每个连接按顺序运行,或者11个连接并行运行。
我们没有观察到任何性能优势。
相反,如果我们只是将取回模拟为阻塞5秒(query1)的1行取回,我们就获得了预期的性能优势。
我们用来并行化的主代码。

  1. def pandas_per_segment(the_conn_pool, segment)-> List[Tuple]:
  2. print(f"TASK is {segment}")
  3. sql_query = config.QUERY2
  4. with the_conn_pool.getconn() as conn:
  5. conn.set_session(readonly=True, autocommit=True)
  6. start = default_timer()
  7. with conn.cursor() as curs:
  8. curs.execute(sql_query)
  9. data = curs.fetchall()
  10. end = default_timer()
  11. print(f'DB to retrieve {segment} took : {end - start:.5f}')
  12. the_conn_pool.putconn(conn)
  13. return data
  14. def get_sales(the_conn_pool) -> pd.DataFrame:
  15. tasks : Dict = {}
  16. start = default_timer()
  17. with futures.ThreadPoolExecutor(max_workers=config.TASKS) as executor:
  18. for segment in range(0, config.SEGMENTS_NO):
  19. task = executor.submit(pandas_per_segment,
  20. the_conn_pool = the_conn_pool,
  21. segment=segment)
  22. tasks[task] = segment
  23. end = default_timer()
  24. print(f'Consumed : {end-start:.5f}')
  25. start = default_timer()
  26. master_list = [task.result() or task in tasks]
  27. result = pd.DataFrame(itertools.chain(*master_list), columns=['item_id', 'brand_name', 'is_exclusive', 'units', 'revenue', 'abs_price', 'segment', 'matches_filter'])
  28. end = default_timer()
  29. print(f'Chained : {end - start:.5f}')
  30. return result

通过从CSV直接获取,我们也看到了相同的性能优势。
理论上讲,python中的套接字/线程/大数据获取不起作用。
这是正确的吗?我们做错什么了吗。
在BigSurx64、Python3.9.6、postgresql 13上进行测试,其余代码随附
我们的 Docker 正在整理文件

  1. version: '2'
  2. services:
  3. database:
  4. container_name:
  5. posgres
  6. image: 'docker.io/bitnami/postgresql:latest'
  7. ports:
  8. - '5432:5432'
  9. volumes:
  10. - 'postgresql_data:/bitnami/postgresql'
  11. environment:
  12. - POSTGRESQL_USERNAME=my_user
  13. - POSTGRESQL_PASSWORD=password123
  14. - POSTGRESQL_DATABASE=mn_dataset
  15. networks:
  16. - pgtapper
  17. volumes:
  18. postgresql_data:
  19. driver: local
  20. networks:
  21. pgtapper:
  22. driver: bridge

config.py文件

  1. TASKS = 1
  2. SEGMENTS_NO = 11
  3. HOST='localhost'
  4. PORT=5432
  5. DBNAME='mn_dataset'
  6. USER='my_user'
  7. PASSWORD='password123'
  8. # PORT=15433
  9. # DBNAME='newron'
  10. # USER='flyway'
  11. # PASSWORD='8P87PE8HKuvjQaAP'
  12. CONNECT_TIMEOUT=600
  13. QUERY1 = '''
  14. select
  15. 123456789 as item_id,
  16. 'm$$$' as brand_name,
  17. true as is_exclusive,
  18. 0.409 as units,
  19. 0.567 as revenue,
  20. 0.999 as abs_price,
  21. 'aaaa' as segment,
  22. TRUE as matches_filter
  23. from (select pg_sleep(5)) xxx
  24. '''
  25. QUERY3 = '''
  26. select * from t1 LIMIT 10000
  27. '''
  28. QUERY2 = '''
  29. select
  30. 123456789 as item_id,
  31. 'm$$$' as brand_name,
  32. true as is_exclusive,
  33. 0.409 as units,
  34. 0.567 as revenue,
  35. 0.999 as abs_price,
  36. 'aaaa' as segment,
  37. TRUE as matches_filter
  38. from generate_series(1, 10000)
  39. '''
  40. MYSQL_QUERY = '''
  41. select
  42. 123456789 as item_id,
  43. 'm$$$' as brand_name,
  44. true as is_exclusive,
  45. 0.409 as units,
  46. 0.567 as revenue,
  47. 0.999 as abs_price,
  48. 'aaaa' as segment,
  49. TRUE as matches_filter
  50. from t1
  51. limit 10000
  52. '''

还有我们的全部例子

  1. # This is a sample Python script.
  2. # Press ⌃R to execute it or replace it with your code.
  3. # Press Double ⇧ to search everywhere for classes, files, tool windows, actions, and settings.
  4. import itertools
  5. from psycopg2.pool import ThreadedConnectionPool
  6. from concurrent import futures
  7. from timeit import default_timer
  8. from typing import Dict, List, Tuple
  9. import config
  10. import pandas as pd
  11. def pandas_per_segment(the_conn_pool, segment)-> List[Tuple]:
  12. print(f"TASK is {segment}")
  13. sql_query = config.QUERY2
  14. with the_conn_pool.getconn() as conn:
  15. conn.set_session(readonly=True, autocommit=True)
  16. start = default_timer()
  17. with conn.cursor() as curs:
  18. curs.execute(sql_query)
  19. data = curs.fetchall()
  20. end = default_timer()
  21. print(f'DB to retrieve {segment} took : {end - start:.5f}')
  22. the_conn_pool.putconn(conn)
  23. return data
  24. def get_sales(the_conn_pool) -> pd.DataFrame:
  25. tasks : Dict = {}
  26. start = default_timer()
  27. with futures.ThreadPoolExecutor(max_workers=config.TASKS) as executor:
  28. for segment in range(0, config.SEGMENTS_NO):
  29. task = executor.submit(pandas_per_segment,
  30. the_conn_pool = the_conn_pool,
  31. segment=segment)
  32. tasks[task] = segment
  33. end = default_timer()
  34. print(f'Consumed : {end-start:.5f}')
  35. start = default_timer()
  36. master_list = [task.result() or task in tasks]
  37. result = pd.DataFrame(itertools.chain(*master_list), columns=['item_id', 'brand_name', 'is_exclusive', 'units', 'revenue', 'abs_price', 'segment', 'matches_filter'])
  38. end = default_timer()
  39. print(f'Chained : {end - start:.5f}')
  40. return result
  41. # Press the green button in the gutter to run the script.
  42. if __name__ == '__main__':
  43. connection_pool = ThreadedConnectionPool(
  44. minconn=config.TASKS,
  45. maxconn=config.TASKS,
  46. host=config.HOST,
  47. port=config.PORT,
  48. dbname=config.DBNAME,
  49. user=config.USER,
  50. password=config.PASSWORD,
  51. connect_timeout=config.CONNECT_TIMEOUT
  52. )
  53. get_sales(connection_pool)
  54. # See PyCharm help at https://www.jetbrains.com/help/pycharm/
jaql4c8m

jaql4c8m1#

对于generate_series查询,几乎所有的时间都花在python读取和处理数据上,而几乎没有时间花在postgresql计算和发送数据上。
看起来像threadedconnectionpool之类的东西(可能是全局解释器锁)协调对数据库连接的访问(使用 futex )因此,在python中,一次只能有一个线程处于“活动”状态(跨越所有线程)。因此,虽然许多查询可以同时在数据库上运行,但这对您没有帮助,因为实际上几乎没有时间花在这方面。

相关问题