如何使用pyodbc加速对ms sql server的大容量插入

cczfrluj  于 2021-09-29  发布在  Java
关注(0)|答案(5)|浏览(412)

下面是我的代码,我想得到一些帮助。我必须在1300000行上运行它,这意味着插入300000行最多需要40分钟。
我想批量插入是加快速度的途径吗?还是因为我通过 for data in reader: 一份?


# Opens the prepped csv file

with open (os.path.join(newpath,outfile), 'r') as f:
    #hooks csv reader to file
    reader = csv.reader(f)
    #pulls out the columns (which match the SQL table)
    columns = next(reader)
    #trims any extra spaces
    columns = [x.strip(' ') for x in columns]
    #starts SQL statement
    query = 'bulk insert into SpikeData123({0}) values ({1})'
    #puts column names in SQL query 'query'
    query = query.format(','.join(columns), ','.join('?' * len(columns)))

    print 'Query is: %s' % query
    #starts curser from cnxn (which works)
    cursor = cnxn.cursor()
    #uploads everything by row
    for data in reader:
        cursor.execute(query, data)
        cursor.commit()

我故意动态地选择我的列标题(因为我想创建尽可能多的pythonic代码)。
spikedata123是表名。

zpf6vheq

zpf6vheq1#

更新-2021年7月:bcpyaz是微软的 bcp 效用。
2019年4月更新:如@simonlang的评论所述, BULK INSERT 在sql server 2017及更高版本中,显然支持csv文件中的文本限定符(参考:此处)。
大容量插入几乎肯定比逐行读取源文件并对每行执行常规插入快得多。但是,批量插入和bcp对于csv文件都有一个很大的限制,因为它们不能处理文本限定符(参考:此处)。也就是说,如果您的csv文件中没有限定的文本字符串。。。

1,Gord Thompson,2015-04-15
2,Bob Loblaw,2015-04-07

... 然后您可以批量插入它,但如果它包含文本限定符(因为某些文本值包含逗号)。。。

1,"Thompson, Gord",2015-04-15
2,"Loblaw, Bob",2015-04-07

... 那么大容量插入无法处理它。不过,将这样的csv文件预处理为管道分隔的文件总体上可能会更快。。。

1|Thompson, Gord|2015-04-15
2|Loblaw, Bob|2015-04-07

... 或制表符分隔的文件(其中 表示制表符)。。。

1→Thompson, Gord→2015-04-15
2→Loblaw, Bob→2015-04-07

... 然后批量插入该文件。对于后一个(制表符分隔的)文件,大容量插入代码如下所示:

import pypyodbc
conn_str = "DSN=myDb_SQLEXPRESS;"
cnxn = pypyodbc.connect(conn_str)
crsr = cnxn.cursor()
sql = """
BULK INSERT myDb.dbo.SpikeData123
FROM 'C:\\__tmp\\biTest.txt' WITH (
    FIELDTERMINATOR='\\t',
    ROWTERMINATOR='\\n'
    );
"""
crsr.execute(sql)
cnxn.commit()
crsr.close()
cnxn.close()

注:如注解中所述,执行 BULK INSERT 语句仅在sql server示例可以直接读取源文件时适用。有关源文件位于远程客户端上的情况,请参阅此答案。

ijnw1ujt

ijnw1ujt2#

正如在对另一个答案的评论中所指出的,t-sql BULK INSERT 仅当要导入的文件与sql server示例位于同一台计算机上,或者位于sql server示例可以读取的smb/cifs网络位置时,该命令才起作用。因此,它可能不适用于源文件位于远程客户端的情况。
pyodbc 4.0.19添加了一个游标快速执行功能,在这种情况下可能会有所帮助。 fast_executemany 默认情况下为“关闭”,并且以下测试代码。。。

cnxn = pyodbc.connect(conn_str, autocommit=True)
crsr = cnxn.cursor()
crsr.execute("TRUNCATE TABLE fast_executemany_test")

sql = "INSERT INTO fast_executemany_test (txtcol) VALUES (?)"
params = [(f'txt{i:06d}',) for i in range(1000)]
t0 = time.time()
crsr.executemany(sql, params)
print(f'{time.time() - t0:.1f} seconds')

... 在我的测试机器上执行大约需要22秒。简单地添加 crsr.fast_executemany = True ...

cnxn = pyodbc.connect(conn_str, autocommit=True)
crsr = cnxn.cursor()
crsr.execute("TRUNCATE TABLE fast_executemany_test")

crsr.fast_executemany = True  # new in pyodbc 4.0.19

sql = "INSERT INTO fast_executemany_test (txtcol) VALUES (?)"
params = [(f'txt{i:06d}',) for i in range(1000)]
t0 = time.time()
crsr.executemany(sql, params)
print(f'{time.time() - t0:.1f} seconds')

... 将执行时间缩短到略多于1秒。

kdfy810k

kdfy810k3#

是的,大容量插入是将大型文件加载到数据库的正确路径。乍一看,我想说,之所以需要如此长的时间,是因为您提到,您正在循环文件中的每一行数据,这实际上意味着删除了使用批量插入的好处,并使其与普通插入一样。请记住,因为它的名称意味着它用于插入数据卡盘。我将删除循环并重试。
另外,我会仔细检查批量插入的语法,因为我觉得它不正确。检查pyodbc生成的sql,因为我感觉它可能只是在执行一个普通的插入
或者,如果仍然很慢,我会尝试直接从sql使用大容量插入,或者使用大容量插入将整个文件加载到临时表中,然后将相关列插入到正确的表中。或者混合使用批量插入和bcp来获得插入的特定列或openrowset。

x6492ojm

x6492ojm4#

这个问题让我很沮丧,我没有看到使用它有多大的改进 fast_executemany 直到我在网上找到这篇文章。具体来说,布莱恩·巴利亚奇(bryan bailliache)对马克斯·瓦尔查尔(max varchar)的评论。我一直在使用sqlalchemy,即使确保更好的数据类型参数也不能解决这个问题;然而,切换到pyodbc确实如此。我还采纳了迈克尔·莫拉的建议,使用临时桌,发现它节省了更多的时间。我写了一个函数,以防有人发现它有用。我写这篇文章是为了获取一个列表或一系列列表作为插入。我使用sqlalchemy和pandas插入了相同的数据 to_sql 从有时需要40分钟到4秒。不过,我可能误用了我以前的方法。
连接

def mssql_conn():
    conn = pyodbc.connect(driver='{ODBC Driver 17 for SQL Server}',
                          server=os.environ.get('MS_SQL_SERVER'),
                          database='EHT',
                          uid=os.environ.get('MS_SQL_UN'),
                          pwd=os.environ.get('MS_SQL_PW'),
                          autocommit=True)
    return conn

插入函数

def mssql_insert(table,val_lst,truncate=False,temp_table=False):
    '''Use as direct connection to database to insert data, especially for
       large inserts. Takes either a single list (for one row),
       or list of list (for multiple rows). Can either append to table
       (default) or if truncate=True, replace existing.'''
    conn = mssql_conn()
    cursor = conn.cursor()
    cursor.fast_executemany = True
    tt = False
    qm = '?,'
    if isinstance(val_lst[0],list):
        rows = len(val_lst)
        params = qm * len(val_lst[0])
    else:
        rows = 1
        params = qm * len(val_lst)
        val_lst = [val_lst]
    params = params[:-1]
    if truncate:
        cursor.execute(f"TRUNCATE TABLE {table}")
    if temp_table:
        #create a temp table with same schema
        start_time = time.time()
        cursor.execute(f"SELECT * INTO ##{table} FROM {table} WHERE 1=0")
        table = f"##{table}"
        #set flag to indicate temp table was used
        tt = True
    else:
        start_time = time.time()
    #insert into either existing table or newly created temp table
    stmt = f"INSERT INTO {table} VALUES ({params})"
    cursor.executemany(stmt,val_lst)
    if tt:
        #remove temp moniker and insert from temp table
        dest_table = table[2:]
        cursor.execute(f"INSERT INTO {dest_table} SELECT * FROM {table}")
        print('Temp table used!')
        print(f'{rows} rows inserted into the {dest_table} table in {time.time() - 
              start_time} seconds')
    else:
        print('No temp table used!')
        print(f'{rows} rows inserted into the {table} table in {time.time() - 
              start_time} seconds')
    cursor.close()
    conn.close()

我的控制台结果首先使用临时表,然后不使用临时表(在这两种情况下,表在执行时包含数据,truncate=true):

No temp table used!
18204 rows inserted into the CUCMDeviceScrape_WithForwards table in 10.595500707626343 
seconds

Temp table used!
18204 rows inserted into the CUCMDeviceScrape_WithForwards table in 3.810380458831787 
seconds
0yg35tkg

0yg35tkg5#

fwiw,我给出了一些方法来插入到SQLServer中,这是我自己的一些测试。通过使用sql server批处理和pyodbccursor.execute语句,我实际上能够获得最快的结果。我没有测试save-to-csv和bulk-insert,我想知道它们之间的比较。
以下是我关于我所做测试的博客:http://jonmorisissqlblog.blogspot.com/2021/05/python-pyodbc-and-batch-inserts-to-sql.html

相关问题