如何通过 SQL 查询创建大型 Pandas 数据框而不会耗尽内存?

2025-02-27 09:05:00
admin
原创
70
摘要:问题描述:我在从 MS SQL Server 数据库中查询包含超过 5 百万条记录的表时遇到了麻烦。我想选择所有记录,但是当选择太多数据到内存中时,我的代码似乎失败了。这有效:import pandas.io.sql as psql sql = "SELECT TOP 1000000 * FROM ...

问题描述:

我在从 MS SQL Server 数据库中查询包含超过 5 百万条记录的表时遇到了麻烦。我想选择所有记录,但是当选择太多数据到内存中时,我的代码似乎失败了。

这有效:

import pandas.io.sql as psql
sql = "SELECT TOP 1000000 * FROM MyTable" 
data = psql.read_frame(sql, cnxn)

...但这不起作用:

sql = "SELECT TOP 2000000 * FROM MyTable" 
data = psql.read_frame(sql, cnxn)

它返回此错误:

File "inference.pyx", line 931, in pandas.lib.to_object_array_tuples
(pandaslib.c:42733) Memory Error

我在这里读到,从 csv 文件创建时也存在类似的问题dataframe,解决方法是使用“迭代器”和“块大小”参数,如下所示:

read_csv('exp4326.csv', iterator=True, chunksize=1000)

是否有类似的解决方案可以从 SQL 数据库进行查询?如果没有,首选的解决方法是什么?我应该使用其他方法来分块读取记录吗?我在这里读了一些关于在 pandas 中处理大型数据集的讨论,但执行 SELECT * 查询似乎需要做很多工作。当然有一种更简单的方法。


解决方案 1:

正如评论中提到的那样,从 pandas 0.15 开始,您可以使用 chunksize 选项来read_sql逐块读取和处理查询块:

sql = "SELECT * FROM My_Table"
for chunk in pd.read_sql_query(sql , engine, chunksize=5):
    print(chunk)

参考:http ://pandas.pydata.org/pandas-docs/version/0.15.2/io.html#querying

解决方案 2:

更新:请务必查看下面的答案,因为 Pandas 现在已内置对分块加载的支持。

您可以简单地尝试逐块读取输入表,然后从各个部分组装出完整的数据框,如下所示:

import pandas as pd
import pandas.io.sql as psql
chunk_size = 10000
offset = 0
dfs = []
while True:
  sql = "SELECT * FROM MyTable limit %d offset %d order by ID" % (chunk_size,offset) 
  dfs.append(psql.read_frame(sql, cnxn))
  offset += chunk_size
  if len(dfs[-1]) < chunk_size:
    break
full_df = pd.concat(dfs)

也可能整个数据框太大而无法放入内存,在这种情况下,您别无选择,只能限制所选择的行数或列数。

解决方案 3:

代码解答及备注。

# Create empty list
dfl = []  

# Create empty dataframe
dfs = pd.DataFrame()  

# Start Chunking
for chunk in pd.read_sql(query, con=conct, ,chunksize=10000000):

    # Start Appending Data Chunks from SQL Result set into List
    dfl.append(chunk)

# Start appending data from list to dataframe
dfs = pd.concat(dfl, ignore_index=True)

然而,我的内存分析告诉我,即使在提取每个块之后释放内存,但列表会越来越大并占用该内存,导致可用 RAM 没有任何收益。

很想听听作者/其他人的看法。

解决方案 4:

我发现处理此问题的最佳方法是利用 SQLAlchemy steam_results 连接选项

conn = engine.connect().execution_options(stream_results=True)

并将 conn 对象传递给 pandas

pd.read_sql("SELECT *...", conn, chunksize=10000)

这将确保光标在服务器端而不是客户端处理

解决方案 5:

您可以使用服务器端游标(又名流结果)

import pandas as pd
from sqlalchemy import create_engine

def process_sql_using_pandas():
    engine = create_engine(
        "postgresql://postgres:pass@localhost/example"
    )
    conn = engine.connect().execution_options(
        stream_results=True)

    for chunk_dataframe in pd.read_sql(
            "SELECT * FROM users", conn, chunksize=1000):
        print(f"Got dataframe w/{len(chunk_dataframe)} rows")
        # ... do something with dataframe ...

if __name__ == '__main__':
    process_sql_using_pandas()

正如其他人在评论中提到的那样,使用chunksize参数pd.read_sql("SELECT * FROM users", engine, chunksize=1000)并不能解决问题,因为它仍然会将整个数据加载到内存中,然后逐块提供给您。

更多解释请点击此处

解决方案 6:

chunksize 仍然会加载内存中的所有数据,stream_results = True 是答案。它是服务器端游标,可加载给定块中的行并节省内存。在许多管道中有效使用,当您加载历史数据时它也可能有所帮助

stream_conn = engine.connect().execution_options(stream_results=True)

使用带有chunksize的pd.read_sql

pd.read_sql("SELECT * FROM SOURCE", stream_conn , chunksize=5000)

解决方案 7:

这是一行代码。我能够将 4900 万条记录加载到数据框中,而不会耗尽内存。

dfs = pd.concat(pd.read_sql(sql, engine, chunksize=500000), ignore_index=True)

解决方案 8:

您可以更新版本 airflow。例如,我在使用 docker-compose 的 2.2.3 版本中遇到了该错误。

  • AIRFLOW__CORE__EXECUTOR=CeleryExecutor

mysq 6.7

cpus: "0.5"
mem_reservation: "10M"
mem_limit: "750M"

redis:

cpus: "0.5"
mem_reservation: "10M"
mem_limit: "250M" 

气流网络服务器:

cpus: "0.5"
mem_reservation: "10M"
mem_limit: "750M"

气流调度程序:

cpus: "0.5"
mem_reservation: "10M"
mem_limit: "750M"

气流工作者:

#cpus: "0.5"
#mem_reservation: "10M"
#mem_limit: "750M"

错误:任务退出,返回代码为 Negsignal.SIGKILL

但更新到版本 FROM apache/airflow:2.3.4。

并使用在 docker-compose 中配置的相同资源顺利执行拉取操作

在此处输入图片描述

我的 dag 提取器:

功能

def getDataForSchema(表,连接,tmp_path,**kwargs):

conn=connect_sql_server(conecction)

query_count= f"select count(1) from {table['schema']}.{table['table_name']}"
logging.info(f"query: {query_count}")
real_count_rows = pd.read_sql_query(query_count, conn) 

##sacar  esquema de la tabla
metadataquery=f"SELECT COLUMN_NAME ,DATA_TYPE  FROM information_schema.columns \n    where table_name = '{table['table_name']}' and table_schema= '{table['schema']}'"
#logging.info(f"query metadata: {metadataquery}")                
metadata = pd.read_sql_query(metadataquery, conn) 
schema=generate_schema(metadata)

#logging.info(f"schema : {schema}")
#logging.info(f"schema: {schema}")

#consulta la tabla a extraer
query=f" SELECT  {table['custom_column_names']} FROM {table['schema']}.{table['table_name']} "
logging.info(f"quere data :{query}")
chunksize=table["partition_field"]
data = pd.read_sql_query(query, conn, chunksize=chunksize)

count_rows=0
pqwriter=None
iteraccion=0
for df_row in data:       
    print(f"bloque  {iteraccion} de  total {count_rows} de un total {real_count_rows.iat[0, 0]}")
    #logging.info(df_row.to_markdown())
    if iteraccion == 0:
        parquetName=f"{tmp_path}/{table['table_name']}_{iteraccion}.parquet"
        pqwriter = pq.ParquetWriter(parquetName,schema)
    tableData = pa.Table.from_pandas(df_row, schema=schema,safe=False, preserve_index=True)
    #logging.info(f" tabledata {tableData.column(17)}")
    pqwriter.write_table(tableData)
    #logging.info(f"parquet name:::{parquetName}")
    ##pasar a parquet df directo
    #df_row.to_parquet(parquetName)
    iteraccion=iteraccion+1
    count_rows += len(df_row)
    del df_row
    del tableData
if pqwriter:
    print("Cerrando archivo parquet")
    pqwriter.close()
del data
del chunksize
del iteraccion

解决方案 9:

使用sqlalchemywith运算符的完整单行代码:

db_engine = sqlalchemy.create_engine(db_url, pool_size=10, max_overflow=20)
with Session(db_engine) as session:
    sql_qry = text("Your query")
    data = pd.concat(pd.read_sql(sql_qry,session.connection().execution_options(stream_results=True), chunksize=500000), ignore_index=True)

您可以尝试更改块大小以找到适合您情况的最佳大小。

解决方案 10:

您可以使用 chunksize 选项,但如果存在 RAM 问题,则需要将其设置为 6-7 位数字。

对于 pd.read_sql 中的块(sql、引擎、参数 =(fromdt、todt、filecode)、chunksize=100000):
 df1.附加(块)
 dfs = pd.concat(df1,ignore_index = True)

做这个

解决方案 11:

如果您想限制输出的行数,只需使用:

data = psql.read_frame(sql, cnxn,chunksize=1000000).__next__()
相关推荐
  政府信创国产化的10大政策解读一、信创国产化的背景与意义信创国产化,即信息技术应用创新国产化,是当前中国信息技术领域的一个重要发展方向。其核心在于通过自主研发和创新,实现信息技术应用的自主可控,减少对外部技术的依赖,并规避潜在的技术制裁和风险。随着全球信息技术竞争的加剧,以及某些国家对中国在科技领域的打压,信创国产化显...
工程项目管理   3892  
  为什么项目管理通常仍然耗时且低效?您是否还在反复更新电子表格、淹没在便利贴中并参加每周更新会议?这确实是耗费时间和精力。借助软件工具的帮助,您可以一目了然地全面了解您的项目。如今,国内外有足够多优秀的项目管理软件可以帮助您掌控每个项目。什么是项目管理软件?项目管理软件是广泛行业用于项目规划、资源分配和调度的软件。它使项...
项目管理软件   2717  
  本文介绍了以下10款项目管理软件工具:禅道项目管理软件、Freshdesk、ClickUp、nTask、Hubstaff、Plutio、Productive、Targa、Bonsai、Wrike。在当今快速变化的商业环境中,项目管理已成为企业成功的关键因素之一。然而,许多企业在项目管理过程中面临着诸多痛点,如任务分配不...
项目管理系统   52  
  本文介绍了以下10款项目管理软件工具:禅道项目管理软件、Monday、TeamGantt、Filestage、Chanty、Visor、Smartsheet、Productive、Quire、Planview。在当今快速变化的商业环境中,项目管理已成为企业成功的关键因素之一。然而,许多项目经理和团队在管理复杂项目时,常...
开源项目管理工具   54  
  本文介绍了以下10款项目管理软件工具:禅道项目管理软件、Smartsheet、GanttPRO、Backlog、Visor、ResourceGuru、Productive、Xebrio、Hive、Quire。在当今快节奏的商业环境中,项目管理已成为企业成功的关键因素之一。然而,许多企业在选择项目管理工具时常常面临困惑:...
项目管理系统   49  
热门文章
项目管理软件有哪些?
曾咪二维码

扫码咨询,免费领取项目管理大礼包!

云禅道AD
禅道项目管理软件

云端的项目管理软件

尊享禅道项目软件收费版功能

无需维护,随时随地协同办公

内置subversion和git源码管理

每天备份,随时转为私有部署

免费试用