如何使用 pyodbc 加速批量插入 MS SQL Server

2025-01-07 08:44:00
admin
原创
143
摘要:问题描述:以下是我需要帮助的代码。我必须运行超过 1,300,000 行的代码,这意味着插入约 300,000 行需要最多40 分钟。我认为批量插入是加快速度的途径?还是因为我通过for data in reader:部分遍历行?#Opens the prepped csv file with open (o...

问题描述:

以下是我需要帮助的代码。我必须运行超过 1,300,000 行的代码,这意味着插入约 300,000 行需要最多40 分钟。

我认为批量插入是加快速度的途径?还是因为我通过for data in reader:部分遍历行?

#Opens the prepped csv file
with open (os.path.join(newpath,outfile), 'r') as f:
    #hooks csv reader to file
    reader = csv.reader(f)
    #pulls out the columns (which match the SQL table)
    columns = next(reader)
    #trims any extra spaces
    columns = [x.strip(' ') for x in columns]
    #starts SQL statement
    query = 'bulk insert into SpikeData123({0}) values ({1})'
    #puts column names in SQL query 'query'
    query = query.format(','.join(columns), ','.join('?' * len(columns)))

    print 'Query is: %s' % query
    #starts curser from cnxn (which works)
    cursor = cnxn.cursor()
    #uploads everything by row
    for data in reader:
        cursor.execute(query, data)
        cursor.commit()

我正在故意动态地选择我的列标题(因为我想创建尽可能最符合 Python 风格的代码)。

SpikeData123 是表名。


解决方案 1:

正如对另一个答案的评论中所述,BULK INSERT只有当要导入的文件与 SQL Server 实例位于同一台计算机上或位于 SQL Server 实例可以读取的 SMB/CIFS 网络位置时,T-SQL 命令才会起作用。因此,当源文件位于远程客户端上时,它可能不适用。

pyodbc 4.0.19 添加了Cursor#fast_executemany功能,在这种情况下可能会有帮助。fast_executemany默认情况下处于“关闭”状态,以下测试代码...

cnxn = pyodbc.connect(conn_str, autocommit=True)
crsr = cnxn.cursor()
crsr.execute("TRUNCATE TABLE fast_executemany_test")

sql = "INSERT INTO fast_executemany_test (txtcol) VALUES (?)"
params = [(f'txt{i:06d}',) for i in range(1000)]
t0 = time.perf_counter()
crsr.executemany(sql, params)
print(f'{time.perf_counter() - t0:.1f} seconds')

... 在我的测试机器上执行大约需要 22 秒。只需添加crsr.fast_executemany = True...

cnxn = pyodbc.connect(conn_str, autocommit=True)
crsr = cnxn.cursor()
crsr.execute("TRUNCATE TABLE fast_executemany_test")

crsr.fast_executemany = True  # new in pyodbc 4.0.19

sql = "INSERT INTO fast_executemany_test (txtcol) VALUES (?)"
params = [(f'txt{i:06d}',) for i in range(1000)]
t0 = time.perf_counter()
crsr.executemany(sql, params)
print(f'{time.perf_counter() - t0:.1f} seconds')

...将执行时间减少到仅 1 秒多一点。

解决方案 2:

更新 - 2022 年 5 月:bcpandas和bcpyaz是 Microsoftbcp实用程序的包装器。


更新 - 2019 年 4 月:正如 @SimonLang 的评论所指出的,BULK INSERTSQL Server 2017 及更高版本显然支持 CSV 文件中的文本限定符(参考:此处)。


BULK INSERT 几乎肯定会比逐行读取源文件并对每行执行常规 INSERT 快得多。但是,BULK INSERT 和 BCP 在 CSV 文件方面都存在很大的限制,因为它们无法处理文本限定符(参考:此处)。也就是说,如果您的 CSV 文件中没有合格的文本字符串...

1,Gord Thompson,2015-04-15
2,Bob Loblaw,2015-04-07

...然后您可以批量插入它,但如果它包含文本限定符(因为某些文本值包含逗号)...

1,"Thompson, Gord",2015-04-15
2,"Loblaw, Bob",2015-04-07

... 那么 BULK INSERT 无法处理它。不过,将这样的 CSV 文件预处理为竖线分隔文件可能总体上会更快...

1|Thompson, Gord|2015-04-15
2|Loblaw, Bob|2015-04-07

...或制表符分隔的文件(其中代表制表符)...

1→Thompson, Gord→2015-04-15
2→Loblaw, Bob→2015-04-07

...然后批量插入该文件。对于后者(制表符分隔)文件,批量插入代码将如下所示:

import pypyodbc
conn_str = "DSN=myDb_SQLEXPRESS;"
cnxn = pypyodbc.connect(conn_str)
crsr = cnxn.cursor()
sql = """
BULK INSERT myDb.dbo.SpikeData123
FROM 'C:\\__tmp\\biTest.txt' WITH (
    FIELDTERMINATOR='\\t',
    ROWTERMINATOR='\\n'
    );
"""
crsr.execute(sql)
cnxn.commit()
crsr.close()
cnxn.close()

注意:如注释中所述,BULK INSERT仅当 SQL Server 实例可以直接读取源文件时,执行语句才适用。对于源文件位于远程客户端上的情况,请参阅此答案。

解决方案 3:

是的,批量插入是将大文件加载到数据库的正确方法。乍一看,我认为它花费这么长时间的原因是您提到您正在循环遍历文件中的每一行数据,这实际上意味着您正在消除使用批量插入的好处并将其变成普通插入。只需记住,因为它的名称暗示它用于插入数据块。我会删除循环并重试。

另外我会仔细检查批量插入的语法,因为它对我来说看起来不正确。检查 pyodbc 生成的 sql,因为我觉得它可能只执行正常插入

或者,如果速度仍然很慢,我会尝试直接从 sql 中使用批量插入,或者使用批量插入将整个文件加载到临时表中,然后将相关列插入到正确的表中。或者混合使用批量插入和 bcp 来插入特定的列或 OPENROWSET。

解决方案 4:

这个问题让我很沮丧,直到我在 SO 上找到这篇文章,我才看到使用情况有了很大改善fast_executemany。具体来说,Bryan Bailliache 关于 max varchar 的评论。我一直在使用 SQLAlchemy,甚至确保更好的数据类型参数也无法为我解决问题;但是,切换到 pyodbc 就可以了。我还采纳了 Michael Moura 的建议,使用临时表,发现它节省了更多时间。我编写了一个函数,以防有人会觉得它有用。我编写它是为了插入一个列表或列表列表。使用 SQLAlchemy 和 Pandas 插入相同的数据to_sql,从有时需要 40 分钟以上减少到不到 4 秒。不过,我可能误用了我以前的方法。

联系

def mssql_conn():
    conn = pyodbc.connect(driver='{ODBC Driver 17 for SQL Server}',
                          server=os.environ.get('MS_SQL_SERVER'),
                          database='EHT',
                          uid=os.environ.get('MS_SQL_UN'),
                          pwd=os.environ.get('MS_SQL_PW'),
                          autocommit=True)
    return conn

插入函数

def mssql_insert(table,val_lst,truncate=False,temp_table=False):
    '''Use as direct connection to database to insert data, especially for
       large inserts. Takes either a single list (for one row),
       or list of list (for multiple rows). Can either append to table
       (default) or if truncate=True, replace existing.'''
    conn = mssql_conn()
    cursor = conn.cursor()
    cursor.fast_executemany = True
    tt = False
    qm = '?,'
    if isinstance(val_lst[0],list):
        rows = len(val_lst)
        params = qm * len(val_lst[0])
    else:
        rows = 1
        params = qm * len(val_lst)
        val_lst = [val_lst]
    params = params[:-1]
    if truncate:
        cursor.execute(f"TRUNCATE TABLE {table}")
    if temp_table:
        #create a temp table with same schema
        start_time = time.time()
        cursor.execute(f"SELECT * INTO ##{table} FROM {table} WHERE 1=0")
        table = f"##{table}"
        #set flag to indicate temp table was used
        tt = True
    else:
        start_time = time.time()
    #insert into either existing table or newly created temp table
    stmt = f"INSERT INTO {table} VALUES ({params})"
    cursor.executemany(stmt,val_lst)
    if tt:
        #remove temp moniker and insert from temp table
        dest_table = table[2:]
        cursor.execute(f"INSERT INTO {dest_table} SELECT * FROM {table}")
        print('Temp table used!')
        print(f'{rows} rows inserted into the {dest_table} table in {time.time() - 
              start_time} seconds')
    else:
        print('No temp table used!')
        print(f'{rows} rows inserted into the {table} table in {time.time() - 
              start_time} seconds')
    cursor.close()
    conn.close()

我的控制台结果首先使用临时表,然后不使用临时表(在这两种情况下,表在执行时包含数据并且 Truncate=True):

No temp table used!
18204 rows inserted into the CUCMDeviceScrape_WithForwards table in 10.595500707626343 
seconds

Temp table used!
18204 rows inserted into the CUCMDeviceScrape_WithForwards table in 3.810380458831787 
seconds

解决方案 5:

仅供参考,我给出了一些我自己的 SQL Server 插入方法。实际上,我能够通过使用 SQL Server 批处理和 pyodbcCursor.execute 语句获得最快的结果。我没有测试保存到 csv 和 BULK INSERT,我想知道它如何比较。

这是我进行的测试的博客:
http://jonmorisissqlblog.blogspot.com/2021/05/python-pyodbc-and-batch-inserts-to-sql.html

解决方案 6:

补充戈登汤普森的回答:

# add the below line for controlling batch size of insert
cursor.fast_executemany_rows = batch_size # by default it is 1000
相关推荐
  政府信创国产化的10大政策解读一、信创国产化的背景与意义信创国产化,即信息技术应用创新国产化,是当前中国信息技术领域的一个重要发展方向。其核心在于通过自主研发和创新,实现信息技术应用的自主可控,减少对外部技术的依赖,并规避潜在的技术制裁和风险。随着全球信息技术竞争的加剧,以及某些国家对中国在科技领域的打压,信创国产化显...
工程项目管理   2941  
  为什么项目管理通常仍然耗时且低效?您是否还在反复更新电子表格、淹没在便利贴中并参加每周更新会议?这确实是耗费时间和精力。借助软件工具的帮助,您可以一目了然地全面了解您的项目。如今,国内外有足够多优秀的项目管理软件可以帮助您掌控每个项目。什么是项目管理软件?项目管理软件是广泛行业用于项目规划、资源分配和调度的软件。它使项...
项目管理软件   1803  
  PLM(产品生命周期管理)系统在企业的产品研发、生产与管理过程中扮演着至关重要的角色。然而,在实际运行中,资源冲突是经常会遇到的难题。资源冲突可能导致项目进度延迟、成本增加以及产品质量下降等一系列问题,严重影响企业的效益与竞争力。因此,如何有效应对PLM系统中的资源冲突,成为众多企业关注的焦点。接下来,我们将详细探讨5...
plm项目管理系统   31  
  敏捷项目管理与产品生命周期管理(PLM)的融合,正成为企业在复杂多变的市场环境中提升研发效率、增强竞争力的关键举措。随着技术的飞速发展和市场需求的快速更迭,传统的研发流程面临着诸多挑战,而将敏捷项目管理理念融入PLM,有望在2025年实现研发流程的深度优化,为企业创造更大的价值。理解敏捷项目管理与PLM的核心概念敏捷项...
plm项目   31  
  模块化设计在现代产品开发中扮演着至关重要的角色,它能够提升产品开发效率、降低成本、增强产品的可维护性与可扩展性。而产品生命周期管理(PLM)系统作为整合产品全生命周期信息的关键平台,对模块化设计有着强大的支持能力。随着技术的不断发展,到 2025 年,PLM 系统在支持模块化设计方面将有一系列令人瞩目的技术实践。数字化...
plm软件   28  
热门文章
项目管理软件有哪些?
曾咪二维码

扫码咨询,免费领取项目管理大礼包!

云禅道AD
禅道项目管理软件

云端的项目管理软件

尊享禅道项目软件收费版功能

无需维护,随时随地协同办公

内置subversion和git源码管理

每天备份,随时转为私有部署

免费试用