python中与postgresql并行的select

u0sqgete  于 2021-08-25  发布在  Java
关注(0)|答案(1)|浏览(447)

嗨,我们试图通过将一个巨大的选择切碎为较小的选择来并行化它。数据集有一个“段”列,因此我们使用它作为划分select的方法。我们的目标是一个posgresql数据库。不幸的是,我们没有观察到性能优势,换句话说,性能的提高与我们使用的线程成线性关系。
我们能够将观察结果分离到一个合成测试用例中。我们将多个回迁(11)模拟为每个回迁来自generate_series查询。
我们使用1个连接,每个连接按顺序运行,或者11个连接并行运行。
我们没有观察到任何性能优势。
相反,如果我们只是将取回模拟为阻塞5秒(query1)的1行取回,我们就获得了预期的性能优势。
我们用来并行化的主代码。

def pandas_per_segment(the_conn_pool, segment)-> List[Tuple]:
    print(f"TASK is {segment}")
    sql_query = config.QUERY2
    with the_conn_pool.getconn() as conn:
        conn.set_session(readonly=True, autocommit=True)
        start = default_timer()
        with conn.cursor() as curs:
            curs.execute(sql_query)
            data = curs.fetchall()
        end = default_timer()
        print(f'DB to retrieve {segment} took : {end - start:.5f}')
    the_conn_pool.putconn(conn)
    return data

def get_sales(the_conn_pool) -> pd.DataFrame:
    tasks : Dict = {}
    start = default_timer()
    with futures.ThreadPoolExecutor(max_workers=config.TASKS) as executor:
        for segment in range(0, config.SEGMENTS_NO):
            task = executor.submit(pandas_per_segment,
                            the_conn_pool = the_conn_pool,
                            segment=segment)
            tasks[task] = segment
    end = default_timer()
    print(f'Consumed : {end-start:.5f}')
    start = default_timer()
    master_list = [task.result() or task in tasks]
    result = pd.DataFrame(itertools.chain(*master_list), columns=['item_id', 'brand_name', 'is_exclusive', 'units', 'revenue', 'abs_price', 'segment', 'matches_filter'])
    end = default_timer()
    print(f'Chained : {end - start:.5f}')
    return result

通过从CSV直接获取,我们也看到了相同的性能优势。
理论上讲,python中的套接字/线程/大数据获取不起作用。
这是正确的吗?我们做错什么了吗。
在BigSurx64、Python3.9.6、postgresql 13上进行测试,其余代码随附
我们的 Docker 正在整理文件

version: '2'

services:

  database:
    container_name:
      posgres
    image: 'docker.io/bitnami/postgresql:latest'
    ports:
      - '5432:5432'
    volumes:
      - 'postgresql_data:/bitnami/postgresql'
    environment:
      - POSTGRESQL_USERNAME=my_user
      - POSTGRESQL_PASSWORD=password123
      - POSTGRESQL_DATABASE=mn_dataset
    networks:
      - pgtapper

volumes:
  postgresql_data:
    driver: local

networks:
  pgtapper:
    driver: bridge

config.py文件

TASKS = 1
SEGMENTS_NO = 11
HOST='localhost'

PORT=5432
DBNAME='mn_dataset'
USER='my_user'
PASSWORD='password123'

# PORT=15433

# DBNAME='newron'

# USER='flyway'

# PASSWORD='8P87PE8HKuvjQaAP'

CONNECT_TIMEOUT=600

QUERY1 = '''
select 

    123456789 as item_id,
    'm$$$' as brand_name,
    true as is_exclusive,
    0.409 as units,
    0.567 as revenue,
    0.999 as abs_price,
    'aaaa' as segment,
    TRUE as matches_filter

from (select pg_sleep(5)) xxx
'''

QUERY3 = '''
 select * from t1 LIMIT 10000
'''

QUERY2 = '''
 select 

    123456789 as item_id,
    'm$$$' as brand_name,
    true as is_exclusive,
    0.409 as units,
    0.567 as revenue,
    0.999 as abs_price,
    'aaaa' as segment,
    TRUE as matches_filter

from generate_series(1, 10000)
'''

MYSQL_QUERY = '''
select 

    123456789 as item_id,
    'm$$$' as brand_name,
    true as is_exclusive,
    0.409 as units,
    0.567 as revenue,
    0.999 as abs_price,
    'aaaa' as segment,
    TRUE as matches_filter

from t1
limit 10000
'''

还有我们的全部例子


# This is a sample Python script.

# Press ⌃R to execute it or replace it with your code.

# Press Double ⇧ to search everywhere for classes, files, tool windows, actions, and settings.

import itertools

from psycopg2.pool import ThreadedConnectionPool

from concurrent import futures
from timeit import default_timer
from typing import Dict, List, Tuple
import config
import pandas as pd

def pandas_per_segment(the_conn_pool, segment)-> List[Tuple]:
    print(f"TASK is {segment}")
    sql_query = config.QUERY2
    with the_conn_pool.getconn() as conn:
        conn.set_session(readonly=True, autocommit=True)
        start = default_timer()
        with conn.cursor() as curs:
            curs.execute(sql_query)
            data = curs.fetchall()
        end = default_timer()
        print(f'DB to retrieve {segment} took : {end - start:.5f}')
    the_conn_pool.putconn(conn)
    return data

def get_sales(the_conn_pool) -> pd.DataFrame:
    tasks : Dict = {}
    start = default_timer()
    with futures.ThreadPoolExecutor(max_workers=config.TASKS) as executor:
        for segment in range(0, config.SEGMENTS_NO):
            task = executor.submit(pandas_per_segment,
                            the_conn_pool = the_conn_pool,
                            segment=segment)
            tasks[task] = segment
    end = default_timer()
    print(f'Consumed : {end-start:.5f}')
    start = default_timer()
    master_list = [task.result() or task in tasks]
    result = pd.DataFrame(itertools.chain(*master_list), columns=['item_id', 'brand_name', 'is_exclusive', 'units', 'revenue', 'abs_price', 'segment', 'matches_filter'])
    end = default_timer()
    print(f'Chained : {end - start:.5f}')
    return result

# Press the green button in the gutter to run the script.

if __name__ == '__main__':
    connection_pool = ThreadedConnectionPool(
        minconn=config.TASKS,
        maxconn=config.TASKS,
        host=config.HOST,
        port=config.PORT,
        dbname=config.DBNAME,
        user=config.USER,
        password=config.PASSWORD,
        connect_timeout=config.CONNECT_TIMEOUT
    )
    get_sales(connection_pool)

# See PyCharm help at https://www.jetbrains.com/help/pycharm/
jaql4c8m

jaql4c8m1#

对于generate_series查询,几乎所有的时间都花在python读取和处理数据上,而几乎没有时间花在postgresql计算和发送数据上。
看起来像threadedconnectionpool之类的东西(可能是全局解释器锁)协调对数据库连接的访问(使用 futex )因此,在python中,一次只能有一个线程处于“活动”状态(跨越所有线程)。因此,虽然许多查询可以同时在数据库上运行,但这对您没有帮助,因为实际上几乎没有时间花在这方面。

相关问题