嗨,我们试图通过将一个巨大的选择切碎为较小的选择来并行化它。数据集有一个“段”列,因此我们使用它作为划分select的方法。我们的目标是一个posgresql数据库。不幸的是,我们没有观察到性能优势,换句话说,性能的提高与我们使用的线程成线性关系。
我们能够将观察结果分离到一个合成测试用例中。我们将多个回迁(11)模拟为每个回迁来自generate_series查询。
我们使用1个连接,每个连接按顺序运行,或者11个连接并行运行。
我们没有观察到任何性能优势。
相反,如果我们只是将取回模拟为阻塞5秒(query1)的1行取回,我们就获得了预期的性能优势。
我们用来并行化的主代码。
def pandas_per_segment(the_conn_pool, segment)-> List[Tuple]:
print(f"TASK is {segment}")
sql_query = config.QUERY2
with the_conn_pool.getconn() as conn:
conn.set_session(readonly=True, autocommit=True)
start = default_timer()
with conn.cursor() as curs:
curs.execute(sql_query)
data = curs.fetchall()
end = default_timer()
print(f'DB to retrieve {segment} took : {end - start:.5f}')
the_conn_pool.putconn(conn)
return data
def get_sales(the_conn_pool) -> pd.DataFrame:
tasks : Dict = {}
start = default_timer()
with futures.ThreadPoolExecutor(max_workers=config.TASKS) as executor:
for segment in range(0, config.SEGMENTS_NO):
task = executor.submit(pandas_per_segment,
the_conn_pool = the_conn_pool,
segment=segment)
tasks[task] = segment
end = default_timer()
print(f'Consumed : {end-start:.5f}')
start = default_timer()
master_list = [task.result() or task in tasks]
result = pd.DataFrame(itertools.chain(*master_list), columns=['item_id', 'brand_name', 'is_exclusive', 'units', 'revenue', 'abs_price', 'segment', 'matches_filter'])
end = default_timer()
print(f'Chained : {end - start:.5f}')
return result
通过从CSV直接获取,我们也看到了相同的性能优势。
理论上讲,python中的套接字/线程/大数据获取不起作用。
这是正确的吗?我们做错什么了吗。
在BigSurx64、Python3.9.6、postgresql 13上进行测试,其余代码随附
我们的 Docker 正在整理文件
version: '2'
services:
database:
container_name:
posgres
image: 'docker.io/bitnami/postgresql:latest'
ports:
- '5432:5432'
volumes:
- 'postgresql_data:/bitnami/postgresql'
environment:
- POSTGRESQL_USERNAME=my_user
- POSTGRESQL_PASSWORD=password123
- POSTGRESQL_DATABASE=mn_dataset
networks:
- pgtapper
volumes:
postgresql_data:
driver: local
networks:
pgtapper:
driver: bridge
config.py文件
TASKS = 1
SEGMENTS_NO = 11
HOST='localhost'
PORT=5432
DBNAME='mn_dataset'
USER='my_user'
PASSWORD='password123'
# PORT=15433
# DBNAME='newron'
# USER='flyway'
# PASSWORD='8P87PE8HKuvjQaAP'
CONNECT_TIMEOUT=600
QUERY1 = '''
select
123456789 as item_id,
'm$$$' as brand_name,
true as is_exclusive,
0.409 as units,
0.567 as revenue,
0.999 as abs_price,
'aaaa' as segment,
TRUE as matches_filter
from (select pg_sleep(5)) xxx
'''
QUERY3 = '''
select * from t1 LIMIT 10000
'''
QUERY2 = '''
select
123456789 as item_id,
'm$$$' as brand_name,
true as is_exclusive,
0.409 as units,
0.567 as revenue,
0.999 as abs_price,
'aaaa' as segment,
TRUE as matches_filter
from generate_series(1, 10000)
'''
MYSQL_QUERY = '''
select
123456789 as item_id,
'm$$$' as brand_name,
true as is_exclusive,
0.409 as units,
0.567 as revenue,
0.999 as abs_price,
'aaaa' as segment,
TRUE as matches_filter
from t1
limit 10000
'''
还有我们的全部例子
# This is a sample Python script.
# Press ⌃R to execute it or replace it with your code.
# Press Double ⇧ to search everywhere for classes, files, tool windows, actions, and settings.
import itertools
from psycopg2.pool import ThreadedConnectionPool
from concurrent import futures
from timeit import default_timer
from typing import Dict, List, Tuple
import config
import pandas as pd
def pandas_per_segment(the_conn_pool, segment)-> List[Tuple]:
print(f"TASK is {segment}")
sql_query = config.QUERY2
with the_conn_pool.getconn() as conn:
conn.set_session(readonly=True, autocommit=True)
start = default_timer()
with conn.cursor() as curs:
curs.execute(sql_query)
data = curs.fetchall()
end = default_timer()
print(f'DB to retrieve {segment} took : {end - start:.5f}')
the_conn_pool.putconn(conn)
return data
def get_sales(the_conn_pool) -> pd.DataFrame:
tasks : Dict = {}
start = default_timer()
with futures.ThreadPoolExecutor(max_workers=config.TASKS) as executor:
for segment in range(0, config.SEGMENTS_NO):
task = executor.submit(pandas_per_segment,
the_conn_pool = the_conn_pool,
segment=segment)
tasks[task] = segment
end = default_timer()
print(f'Consumed : {end-start:.5f}')
start = default_timer()
master_list = [task.result() or task in tasks]
result = pd.DataFrame(itertools.chain(*master_list), columns=['item_id', 'brand_name', 'is_exclusive', 'units', 'revenue', 'abs_price', 'segment', 'matches_filter'])
end = default_timer()
print(f'Chained : {end - start:.5f}')
return result
# Press the green button in the gutter to run the script.
if __name__ == '__main__':
connection_pool = ThreadedConnectionPool(
minconn=config.TASKS,
maxconn=config.TASKS,
host=config.HOST,
port=config.PORT,
dbname=config.DBNAME,
user=config.USER,
password=config.PASSWORD,
connect_timeout=config.CONNECT_TIMEOUT
)
get_sales(connection_pool)
# See PyCharm help at https://www.jetbrains.com/help/pycharm/
1条答案
按热度按时间jaql4c8m1#
对于generate_series查询,几乎所有的时间都花在python读取和处理数据上,而几乎没有时间花在postgresql计算和发送数据上。
看起来像threadedconnectionpool之类的东西(可能是全局解释器锁)协调对数据库连接的访问(使用
futex
)因此,在python中,一次只能有一个线程处于“活动”状态(跨越所有线程)。因此,虽然许多查询可以同时在数据库上运行,但这对您没有帮助,因为实际上几乎没有时间花在这方面。