python 如何Postgresql复制到标准与CSV做的冲突做更新?

4uqofj5v  于 2022-12-17  发布在  Python
关注(0)|答案(4)|浏览(137)

我要做的

" on conflict (time) do update set name , description "

但是我不知道当我使用标准输入和csv时,我不知道什么名称等于什么?什么描述等于什么...
表_a:

xxx.csv:

with open('xxx/xxx.csv', 'r', encoding='utf8') as f:
    sql = """
    COPY table_a FROM STDIN With CSV on conflict (time) 
    do update set name=??, description=??;
    """
    cur.copy_expert(sql, f)
    conn.commit()
4ngedf3f

4ngedf3f1#

this SO post中,有两个答案(结合在一起)为成功使用ON CONFLICT提供了一个很好的解决方案。下面的示例使用ON CONFLICT DO NOTHING;

BEGIN;
CREATE TEMP TABLE tmp_table 
(LIKE main_table INCLUDING DEFAULTS)
ON COMMIT DROP;
    
COPY tmp_table FROM 'full/file/name/here';
    
INSERT INTO main_table
SELECT *
FROM tmp_table
ON CONFLICT DO NOTHING;
COMMIT;

用表的名称替换main_table的两个示例。

w6lpcovy

w6lpcovy2#

感谢各位大师的解答。
这就是我的解决方案。

sql = """
CREATE TABLE temp_h (
    time ,
    name,
    description
);
COPY temp_h FROM STDIN With CSV;

INSERT INTO table_a(time, name, description)
SELECT *
FROM temp_h ON conflict (time) 
DO update set name=EXCLUDED.name, description=EXCLUDED.description;

DROP TABLE temp_h;
"""
xesrikrc

xesrikrc3#

我已经成功地使用以下函数完成了一个批量 upsert(欢迎提出建议):

import io
from sqlalchemy.engine import Engine
from sqlalchemy.ext import declarative_base

BaseModel = declarative_base()

def upsert_bulk(engine: Engine, model: BaseModel, data: io.StringIO) -> None:
    """
    Fast way to upsert multiple entries at once

    :param `db`: DB Session
    :param `data`: CSV in a stream object
    """
    table_name = model.__tablename__
    temp_table_name = f"temp_{table_name}"

    columns = [c.key for c in model.__table__.columns]

    # Select only columns to be updated (in my case, all non-id columns)
    variable_columns = [c for c in columns if c != "id"]

    # Create string with set of columns to be updated
    update_set = ", ".join([f"{v}=EXCLUDED.{v}" for v in variable_columns])

    # Rewind data and prepare it for `copy_from`
    data.seek(0)

    with conn.cursor() as cur:
        # Creates temporary empty table with same columns and types as
        # the final table
        cur.execute(
            f"""
            CREATE TEMPORARY TABLE {temp_table_name} (LIKE {table_name})
            ON COMMIT DROP
            """
        )

        # Copy stream data to the created temporary table in DB
        cur.copy_from(data, temp_table_name)

        # Inserts copied data from the temporary table to the final table
        # updating existing values at each new conflict
        cur.execute(
            f"""
            INSERT INTO {table_name}({', '.join(columns)})
            SELECT * FROM {temp_table_name}
            ON CONFLICT (id) DO UPDATE SET {update_set}
            """
        )

        # Drops temporary table (I believe this step is unnecessary,
        # but tables sizes where growing without any new data modifications
        # if this command isn't executed)
        cur.execute(f"DROP TABLE {temp_table_name}")

        # Commit everything through cursor
        conn.commit()

    conn.close()

相关问题