如何将 DataFrame 写入 postgres 表

2025-02-17 09:25:00
admin
原创
99
摘要:问题描述:有DataFrame.to_sql方法,但它仅适用于 mysql、sqlite 和 oracle 数据库。我无法将 postgres 连接或 sqlalchemy 引擎传递给此方法。解决方案 1:从 pandas 0.14(2014 年 5 月底发布)开始,支持 postgresql。该sql模块现...

问题描述:

DataFrame.to_sql方法,但它仅适用于 mysql、sqlite 和 oracle 数据库。我无法将 postgres 连接或 sqlalchemy 引擎传递给此方法。


解决方案 1:

从 pandas 0.14(2014 年 5 月底发布)开始,支持 postgresql。该sql模块现在用于sqlalchemy支持不同的数据库类型。您可以为 postgresql 数据库传递一个 sqlalchemy 引擎(请参阅文档)。例如:

from sqlalchemy import create_engine
engine = create_engine('postgresql://username:password@localhost:5432/mydatabase')
df.to_sql('table_name', engine)

您说得对,在 Pandas 0.13.1 之前的版本中,postgresql 不受支持。如果您需要使用旧版本的 Pandas,这里有一个修补版本pandas.io.sql:https ://gist.github.com/jorisvandenbossche/10841234 。

这是我以前写的,所以不能完全保证它总是有效,但基础应该在那里)。如果您将该文件放在工作目录中并导入它,那么您应该能够执行(conpostgresql 连接在哪里):

import sql  # the patched version (file is named sql.py)
sql.write_frame(df, 'table_name', con, flavor='postgresql')

解决方案 2:

更快的选择:

以下代码将以比 df.to_sql 方法快得多的速度将您的 Pandas DF 复制到 postgres DB,并且您不需要任何中间 csv 文件来存储 df。

根据您的数据库规范创建一个引擎。

在您的 postgres DB 中创建一个表,其列数与 Dataframe (df) 相同。

DF 中的数据将插入到您的 postgres 表中。

from sqlalchemy import create_engine
import psycopg2 
import io

如果要替换表,我们可以使用来自 df 的标题将其替换为普通的 to_sql 方法,然后将整个耗时的 df 加载到 DB 中。

engine = create_engine(
    'postgresql+psycopg2://username:password@host:port/database')

# Drop old table and create new empty table
df.head(0).to_sql('table_name', engine, if_exists='replace',index=False)

conn = engine.raw_connection()
cur = conn.cursor()
output = io.StringIO()
df.to_csv(output, sep='    ', header=False, index=False)
output.seek(0)
contents = output.getvalue()
cur.copy_from(output, 'table_name', null="") # null values become ''
conn.commit()
cur.close()
conn.close()

解决方案 3:

Pandas 0.24.0+解决方案

Pandas 0.24.0 引入了一项新功能,专门用于快速写入 Postgres。您可以在此处了解更多信息:https://pandas.pydata.org/pandas-docs/stable/user_guide/io.html#io-sql-method

import csv
from io import StringIO

from sqlalchemy import create_engine

def psql_insert_copy(table, conn, keys, data_iter):
    # gets a DBAPI connection that can provide a cursor
    dbapi_conn = conn.connection
    with dbapi_conn.cursor() as cur:
        s_buf = StringIO()
        writer = csv.writer(s_buf)
        writer.writerows(data_iter)
        s_buf.seek(0)

        columns = ', '.join('"{}"'.format(k) for k in keys)
        if table.schema:
            table_name = '{}.{}'.format(table.schema, table.name)
        else:
            table_name = table.name

        sql = 'COPY {} ({}) FROM STDIN WITH CSV'.format(
            table_name, columns)
        cur.copy_expert(sql=sql, file=s_buf)

engine = create_engine('postgresql://myusername:mypassword@myhost:5432/mydatabase')
df.to_sql('table_name', engine, method=psql_insert_copy)

解决方案 4:

我是这样做的。

它可能更快,因为它使用execute_batch

# df is the dataframe
if len(df) > 0:
    df_columns = list(df)
    # create (col1,col2,...)
    columns = ",".join(df_columns)

    # create VALUES('%s', '%s",...) one '%s' per column
    values = "VALUES({})".format(",".join(["%s" for _ in df_columns])) 

    #create INSERT INTO table (columns) VALUES('%s',...)
    insert_stmt = "INSERT INTO {} ({}) {}".format(table,columns,values)

    cur = conn.cursor()
    psycopg2.extras.execute_batch(cur, insert_stmt, df.values)
    conn.commit()
    cur.close()

解决方案 5:

将 df 写入具有/不具有索引的自定义模式中的表的更快方法:

"""
Faster way to write df to table.
Slower way is to use df.to_sql()
"""

from io import StringIO

from pandas import DataFrame
from sqlalchemy.engine.base import Engine


class WriteDfToTableWithIndexMixin:
    @classmethod
    def write_df_to_table_with_index(
            cls,
            df: DataFrame,
            table_name: str,
            schema_name: str,
            engine: Engine
    ):
        """
        Truncate existing table and load df into table.
        Keep each column as string to avoid datatype conflicts.
        """
        df.head(0).to_sql(table_name, engine, if_exists='replace',
                          schema=schema_name, index=True, index_label='id')

        conn = engine.raw_connection()
        cur = conn.cursor()
        output = StringIO()
        df.to_csv(output, sep='    ', header=False,
                  index=True, index_label='id')
        output.seek(0)
        contents = output.getvalue()
        cur.copy_expert(f"COPY {schema_name}.{table_name} FROM STDIN", output)
        conn.commit()


class WriteDfToTableWithoutIndexMixin:
    @classmethod
    def write_df_to_table_without_index(
            cls,
            df: DataFrame,
            table_name: str,
            schema_name: str,
            engine: Engine
    ):
        """
        Truncate existing table and load df into table.
        Keep each column as string to avoid datatype conflicts.
        """
        df.head(0).to_sql(table_name, engine, if_exists='replace',
                          schema=schema_name, index=False)

        conn = engine.raw_connection()
        cur = conn.cursor()
        output = StringIO()
        df.to_csv(output, sep='    ', header=False, index=False)
        output.seek(0)
        contents = output.getvalue()
        cur.copy_expert(f"COPY {schema_name}.{table_name} FROM STDIN", output)
        conn.commit()

如果您的 df 中的某一列包含 JSON 值,则上述方法仍将正确加载所有数据,但 json 列将具有一些奇怪的格式。因此将该 json 列转换为::json可能会产生错误。您必须使用to_sql()。添加method=multi以加快速度并添加chunksize以防止您的机器冻结:

df.to_sql(table_name, engine, if_exists='replace', schema=schema_name, index=False, method='multi', chunksize=1000)

解决方案 6:

使用 psycopg2,您可以使用本机 sql 命令将数据写入 postgres 表。

import psycopg2
import pandas as pd

conn = psycopg2.connect("dbname='{db}' user='{user}' host='{host}' port='{port}' password='{passwd}'".format(
            user=pg_user,
            passwd=pg_pass,
            host=pg_host,
            port=pg_port,
            db=pg_db))
cur = conn.cursor()    
def insertIntoTable(df, table):
        """
        Using cursor.executemany() to insert the dataframe
        """
        # Create a list of tupples from the dataframe values
        tuples = list(set([tuple(x) for x in df.to_numpy()]))
    
        # Comma-separated dataframe columns
        cols = ','.join(list(df.columns))
        # SQL query to execute
        query = "INSERT INTO %s(%s) VALUES(%%s,%%s,%%s,%%s)" % (
            table, cols)
    
        try:
            cur.executemany(query, tuples)
            conn.commit()

        except (Exception, psycopg2.DatabaseError) as error:
            print("Error: %s" % error)
            conn.rollback()
            return 1

解决方案 7:

对于 Python 2.7 和 Pandas 0.24.2 并使用 Psycopg2

Psycopg2 连接模块

def dbConnect (db_parm, username_parm, host_parm, pw_parm):
    # Parse in connection information
    credentials = {'host': host_parm, 'database': db_parm, 'user': username_parm, 'password': pw_parm}
    conn = psycopg2.connect(**credentials)
    conn.autocommit = True  # auto-commit each entry to the database
    conn.cursor_factory = RealDictCursor
    cur = conn.cursor()
    print ("Connected Successfully to DB: " + str(db_parm) + "@" + str(host_parm))
    return conn, cur

连接到数据库

conn, cur = dbConnect(databaseName, dbUser, dbHost, dbPwd)

假设数据框已经作为 df 存在

output = io.BytesIO() # For Python3 use StringIO
df.to_csv(output, sep='    ', header=True, index=False)
output.seek(0) # Required for rewinding the String object
copy_query = "COPY mem_info FROM STDOUT csv DELIMITER '    ' NULL ''  ESCAPE '\\' HEADER "  # Replace your table name in place of mem_info
cur.copy_expert(copy_query, output)
conn.commit()

解决方案 8:

创建引擎(其中方言='postgres'或'mysql'等):
from sqlalchemy import create_engine
engine = create_engine(f'{dialect}://{user_name}@{host}:{port}/{db_name}')
Session = sessionmaker(bind=engine) 

with Session() as session:
    df = pd.read_csv(path + f'/{file}') 
    df.to_sql('table_name', con=engine, if_exists='append',index=False)
相关推荐
  政府信创国产化的10大政策解读一、信创国产化的背景与意义信创国产化,即信息技术应用创新国产化,是当前中国信息技术领域的一个重要发展方向。其核心在于通过自主研发和创新,实现信息技术应用的自主可控,减少对外部技术的依赖,并规避潜在的技术制裁和风险。随着全球信息技术竞争的加剧,以及某些国家对中国在科技领域的打压,信创国产化显...
工程项目管理   3983  
  为什么项目管理通常仍然耗时且低效?您是否还在反复更新电子表格、淹没在便利贴中并参加每周更新会议?这确实是耗费时间和精力。借助软件工具的帮助,您可以一目了然地全面了解您的项目。如今,国内外有足够多优秀的项目管理软件可以帮助您掌控每个项目。什么是项目管理软件?项目管理软件是广泛行业用于项目规划、资源分配和调度的软件。它使项...
项目管理软件   2747  
  本文介绍了以下10款项目管理软件工具:禅道项目管理软件、Freshdesk、ClickUp、nTask、Hubstaff、Plutio、Productive、Targa、Bonsai、Wrike。在当今快速变化的商业环境中,项目管理已成为企业成功的关键因素之一。然而,许多企业在项目管理过程中面临着诸多痛点,如任务分配不...
项目管理系统   82  
  本文介绍了以下10款项目管理软件工具:禅道项目管理软件、Monday、TeamGantt、Filestage、Chanty、Visor、Smartsheet、Productive、Quire、Planview。在当今快速变化的商业环境中,项目管理已成为企业成功的关键因素之一。然而,许多项目经理和团队在管理复杂项目时,常...
开源项目管理工具   90  
  本文介绍了以下10款项目管理软件工具:禅道项目管理软件、Smartsheet、GanttPRO、Backlog、Visor、ResourceGuru、Productive、Xebrio、Hive、Quire。在当今快节奏的商业环境中,项目管理已成为企业成功的关键因素之一。然而,许多企业在选择项目管理工具时常常面临困惑:...
项目管理系统   79  
热门文章
项目管理软件有哪些?
曾咪二维码

扫码咨询,免费领取项目管理大礼包!

云禅道AD
禅道项目管理软件

云端的项目管理软件

尊享禅道项目软件收费版功能

无需维护,随时随地协同办公

内置subversion和git源码管理

每天备份,随时转为私有部署

免费试用