更好的方法将非常大的数据插入PostgreSQL表

tquggr8v  于 11个月前  发布在  PostgreSQL
关注(0)|答案(2)|浏览(142)

在PostgreSQL表中插入一个非常大的数据的更好的方法是什么?
操作系统:Ubuntu 22.04 LTS
数据库:PostgreSQL 14
框架:Python 3.11 Django
现在我使用insert into语句,每次插入100,000行。平均插入1,000,000行的整个过程需要2分钟,这在我可以接受的范围内。但我想知道是否有更好的方法来做到这一点。
它工作得很好,但不知何故,它需要更多的时间,有时给错误,如

OperationalError: (psycopg2.OperationalError) server closed the connection unexpectedly

个字符

fslejnso

fslejnso1#

一种更快的方法是在会话中使用一个预处理语句,然后用一批新的行重复执行exec
一个更快的方法是使用COPY(在未记录的表上可以选择WITH (FREEZE)https://www.postgresql.org/docs/current/sql-copy.html),然后添加索引和约束。

km0tfn4u

km0tfn4u2#

我建议使用像Celery这样的任务队列来处理Django中的大型数据库插入。原因如下:
防止请求超时:在请求上下文中长时间运行的数据库操作可能会超过超时限制,从而导致错误。任务队列支持异步执行,因此当数据库任务在后台运行时,请求可以立即返回。提高可扩展性:任务队列将工作分配给多个工作进程,从而提高性能并有效地处理较大的工作负载。提供监视和恢复机制:Celery提供了一些工具来监控任务执行,重试失败的任务,并优雅地处理错误。这里有一个简单的例子来说明如何使用Celery实现这一目的:

1.安装Celery:

第一个月

2.配置Celery:

设置Celery代理(例如RabbitMQ或Redis)进行任务通信。在Django应用中定义Celery任务。3.创建Celery任务:

from celery import shared_task

@shared_task
def process_and_insert_data():
    batch_size = 100000  # Adjust as needed
    offset = 0

    while True:
        with connection.cursor() as cursor:
            transaction_list_query = f"SELECT * FROM {source_table} LIMIT {batch_size} OFFSET {offset};"
            cursor.execute(transaction_list_query)
            transaction_list = dictfetchall(cursor)

            if not transaction_list:
                break

            data_to_insert = []
            for transaction in transaction_list:
                # Perform process-intensive calculations here
                data_to_insert.append(...)

            insert_query = f"""
                INSERT INTO {per_transaction_table} ({company_ref_id_id_column}, {rrn_column},
                    {transaction_type_ref_id_id_column}, {transactionamount_column})
                VALUES {','.join(['%s'] * len(data_to_insert))}
                ON CONFLICT ({rrn_column}) DO UPDATE SET {company_ref_id_id_column} = EXCLUDED.{company_ref_id_id_column};
            """
            cursor.executemany(insert_query, data_to_insert)

        offset += batch_size

字符串

4.发起任务:

from .tasks import process_and_insert_data

# Within a view or other part of your code:
process_and_insert_data.delay()


记住:
根据您的特定需求调整批量大小和其他参数。如果适用,请考虑使用Django的ORM进行任务内的数据库交互。探索Celery用于监控,错误处理和任务优先级的高级功能。

相关问题