将Dataframe直接保存到csv到s3 Python

2025-04-15 09:20:00
admin
原创
32
摘要:问题描述:我有一个 pandas DataFrame 数据帧,想将其上传到一个新的 CSV 文件。问题是我不想在将文件传输到 S3 之前先将其保存在本地。有没有类似 to_csv 的方法可以将数据帧直接写入 S3?我正在使用 boto3。 以下是我目前得到的结果:import boto3 s3 = boto...

问题描述:

我有一个 pandas DataFrame 数据帧,想将其上传到一个新的 CSV 文件。问题是我不想在将文件传输到 S3 之前先将其保存在本地。有没有类似 to_csv 的方法可以将数据帧直接写入 S3?我正在使用 boto3。

以下是我目前得到的结果:

import boto3
s3 = boto3.client('s3', aws_access_key_id='key', aws_secret_access_key='secret_key')
read_file = s3.get_object(Bucket, Key)
df = pd.read_csv(read_file['Body'])

# Make alterations to DataFrame

# Then export DataFrame to CSV through direct transfer to s3

解决方案 1:

您可以使用:

from io import StringIO # python3; python2: BytesIO 
import boto3

bucket = 'my_bucket_name' # already created on S3
csv_buffer = StringIO()
df.to_csv(csv_buffer)
s3_resource = boto3.resource('s3')
s3_resource.Object(bucket, 'df.csv').put(Body=csv_buffer.getvalue())

此外,如果安装了 fspec,pandas现在还可以通过fspec处理远程文件的读写。换句话说,在这种情况下也可以正常工作。df.to_csv('s3://bucket/folder/path/file.csv)

解决方案 2:

您可以直接使用 S3 路径。我使用的是Pandas 0.24.1

In [1]: import pandas as pd

In [2]: df = pd.DataFrame( [ [1, 1, 1], [2, 2, 2] ], columns=['a', 'b', 'c'])

In [3]: df
Out[3]:
   a  b  c
0  1  1  1
1  2  2  2

In [4]: df.to_csv('s3://experimental/playground/temp_csv/dummy.csv', index=False)

In [5]: pd.__version__
Out[5]: '0.24.1'

In [6]: new_df = pd.read_csv('s3://experimental/playground/temp_csv/dummy.csv')

In [7]: new_df
Out[7]:
   a  b  c
0  1  1  1
1  2  2  2

发行说明:

S3 文件处理

pandas 现在使用 s3fs 来处理 S3 连接。这不会破坏任何代码。但是,由于 s3fs 不是必需的依赖项,因此您需要单独安装它,就像以前版本 pandas 中的 boto 一样。GH11915。

解决方案 3:

我喜欢s3fs,它让你可以(几乎)像使用本地文件系统一样使用 s3。

您可以这样做:

import s3fs

bytes_to_write = df.to_csv(None).encode()
fs = s3fs.S3FileSystem(key=key, secret=secret)
with fs.open('s3://bucket/path/to/file.csv', 'wb') as f:
    f.write(bytes_to_write)

s3fs仅支持打开文件的模式,这就是我做这些rb事情的原因。wb`bytes_to_write`

解决方案 4:

这是一个更新的答案:

import s3fs

s3 = s3fs.S3FileSystem(anon=False)

# Use 'w' for py3, 'wb' for py2
with s3.open('<bucket-name>/<filename>.csv','w') as f:
    df.to_csv(f)

StringIO 的问题在于它会消耗内存。使用这种方法,您将文件流式传输到 s3,而不是将其转换为字符串,然后再写入 s3。将 Pandas DataFrame 及其字符串副本保存在内存中似乎非常低效。

如果您正在 EC2 实例中工作,可以为其授予 IAM 角色以将其写入 S3,这样您无需直接传递凭证。但是,您也可以通过将凭证传递给S3FileSystem()函数来连接到存储桶。请参阅文档:https://s3fs.readthedocs.io/en/latest/

解决方案 5:

您还可以使用AWS Data Wrangler:

import awswrangler as wr
    
wr.s3.to_csv(
    df=df,
    path="s3://...",
)

请注意,它将为您处理分段上传,以加快上传速度。

解决方案 6:

如果您将其None作为第一个参数传递to_csv(),数据将以字符串形式返回。之后,只需一步即可轻松将其一次性上传到 S3。

还应该可以将StringIO对象传递给to_csv(),但使用字符串会更容易。

解决方案 7:

client我发现这不仅可以使用,还可以使用resource

from io import StringIO
import boto3
s3 = boto3.client("s3",\n                  region_name=region_name,\n                  aws_access_key_id=aws_access_key_id,\n                  aws_secret_access_key=aws_secret_access_key)
csv_buf = StringIO()
df.to_csv(csv_buf, header=True, index=False)
csv_buf.seek(0)
s3.put_object(Bucket=bucket, Body=csv_buf.getvalue(), Key='path/test.csv')

解决方案 8:

我使用AWS Data Wrangler。例如:

import awswrangler as wr
import pandas as pd

# read a local dataframe
df = pd.read_parquet('my_local_file.gz')

# upload to S3 bucket
wr.s3.to_parquet(df=df, path='s3://mys3bucket/file_name.gz')

这同样适用于 csv 文件。不要使用read_parquetto_parquet,而要使用read_csv和 ,to_csv并加上正确的文件扩展名。

解决方案 9:

您可以使用

  • 熊猫

  • 博托3

  • s3fs(版本≤0.4)

我在路径中to_csv使用s3://`storage_options`

key = "folder/file.csv"

df.to_csv(
    f"s3://{YOUR_S3_BUCKET}/{key}",
    index=False,
    storage_options={
        "key": AWS_ACCESS_KEY_ID,
        "secret": AWS_SECRET_ACCESS_KEY,
        "token": AWS_SESSION_TOKEN,
    },

解决方案 10:

由于您正在使用boto3.client(),请尝试:

import boto3
from io import StringIO #python3 
s3 = boto3.client('s3', aws_access_key_id='key', aws_secret_access_key='secret_key')
def copy_to_s3(client, df, bucket, filepath):
    csv_buf = StringIO()
    df.to_csv(csv_buf, header=True, index=False)
    csv_buf.seek(0)
    client.put_object(Bucket=bucket, Body=csv_buf.getvalue(), Key=filepath)
    print(f'Copy {df.shape[0]} rows to S3 Bucket {bucket} at {filepath}, Done!')

copy_to_s3(client=s3, df=df_to_upload, bucket='abc', filepath='def/test.csv')

解决方案 11:

from io import StringIO
import boto3
#Creating Session With Boto3.
session = boto3.Session(
aws_access_key_id='<your_access_key_id>',
aws_secret_access_key='<your_secret_access_key>'
)
#Creating S3 Resource From the Session.
s3_res = session.resource('s3')
csv_buffer = StringIO()
df.to_csv(csv_buffer)
bucket_name = 'stackvidhya'
s3_object_name = 'df.csv'
s3_res.Object(bucket_name, s3_object_name).put(Body=csv_buffer.getvalue())
print("Dataframe is saved as CSV in S3 bucket.")

解决方案 12:

为了有效地处理大文件,您还可以使用与 S3 兼容的开源 MinIO 及其minio python 客户端包,就像我的这个函数一样:

import minio
import os
import pandas as pd

minio_client = minio.Minio(..)

def write_df_to_minio(df, 
                    minio_client, 
                    bucket_name, 
                    file_name="new-file.csv",
                    local_temp_folder="/tmp/", 
                    content_type="application/csv",
                    sep=",",
                    save_row_index=False):

    df.to_csv(os.path.join(local_temp_folder, file_name), sep=sep, index=save_row_index)
    
    minio_results = minio_client.fput_object(bucket_name=bucket_name,
                                             object_name=file_name,
                                             file_path=os.path.join(local_temp_folder, file_name),
                                             content_type=content_type)

    assert minio_results.object_name == file_name

解决方案 13:

另一个选择是使用cloudpathlib来实现,它支持 S3、Google Cloud Storage 和 Azure Blob Storage。请参阅下面的示例。

import pandas as pd
from cloudpathlib import CloudPath

# read data from S3
df = pd.read_csv(CloudPath("s3://covid19-lake/rearc-covid-19-testing-data/csv/states_daily/states_daily.csv"))

# look at some of the data
df.head(1).T.iloc[:10]
#>                                       0
#> date                           20210307
#> state                                AK
#> positive                        56886.0
#> probableCases                       NaN
#> negative                            NaN
#> pending                             NaN
#> totalTestResultsSource  totalTestsViral
#> totalTestResults              1731628.0
#> hospitalizedCurrently              33.0
#> hospitalizedCumulative           1293.0

# writing to S3
with CloudPath("s3://bucket-you-can-write-to/data.csv").open("w") as f:
    df.to_csv(f)

CloudPath("s3://bucket-you-can-write-to/data.csv").exists()
#> True

df.to_csv(CloudPath("s3://drivendata-public-assets/test-asdf2.csv"))请注意,由于 pandas 处理传递给它的路径/句柄的方式,您无法直接调用。相反,您需要打开要写入的文件,然后将该句柄直接传递给to_csv

这在设置特定选项或不同的身份验证机制或保持持久缓存方面带来了一些额外的好处,因此您不必总是从 S3 重新下载。

解决方案 14:

我从 bucket s3 中读取了一个包含两列的 csv,并将文件 csv 的内容放入了 pandas 数据框中。

例子:

配置.json

{
  "credential": {
    "access_key":"xxxxxx",
    "secret_key":"xxxxxx"
}
,
"s3":{
       "bucket":"mybucket",
       "key":"csv/user.csv"
   }
}

cls_config.json

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import os
import json

class cls_config(object):

    def __init__(self,filename):

        self.filename = filename


    def getConfig(self):

        fileName = os.path.join(os.path.dirname(__file__), self.filename)
        with open(fileName) as f:
        config = json.load(f)
        return config

cls_pandas.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import pandas as pd
import io

class cls_pandas(object):

    def __init__(self):
        pass

    def read(self,stream):

        df = pd.read_csv(io.StringIO(stream), sep = ",")
        return df

cls_s3.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import boto3
import json

class cls_s3(object):

    def  __init__(self,access_key,secret_key):

        self.s3 = boto3.client('s3', aws_access_key_id=access_key, aws_secret_access_key=secret_key)

    def getObject(self,bucket,key):

        read_file = self.s3.get_object(Bucket=bucket, Key=key)
        body = read_file['Body'].read().decode('utf-8')
        return body

测试.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from cls_config import *
from cls_s3 import *
from cls_pandas import *

class test(object):

    def __init__(self):
        self.conf = cls_config('config.json')

    def process(self):

        conf = self.conf.getConfig()

        bucket = conf['s3']['bucket']
        key = conf['s3']['key']

        access_key = conf['credential']['access_key']
        secret_key = conf['credential']['secret_key']

        s3 = cls_s3(access_key,secret_key)
        ob = s3.getObject(bucket,key)

        pa = cls_pandas()
        df = pa.read(ob)

        print df

if __name__ == '__main__':
    test = test()
    test.process()
相关推荐
  政府信创国产化的10大政策解读一、信创国产化的背景与意义信创国产化,即信息技术应用创新国产化,是当前中国信息技术领域的一个重要发展方向。其核心在于通过自主研发和创新,实现信息技术应用的自主可控,减少对外部技术的依赖,并规避潜在的技术制裁和风险。随着全球信息技术竞争的加剧,以及某些国家对中国在科技领域的打压,信创国产化显...
工程项目管理   2482  
  为什么项目管理通常仍然耗时且低效?您是否还在反复更新电子表格、淹没在便利贴中并参加每周更新会议?这确实是耗费时间和精力。借助软件工具的帮助,您可以一目了然地全面了解您的项目。如今,国内外有足够多优秀的项目管理软件可以帮助您掌控每个项目。什么是项目管理软件?项目管理软件是广泛行业用于项目规划、资源分配和调度的软件。它使项...
项目管理软件   1533  
  PLM(产品生命周期管理)项目对于企业优化产品研发流程、提升产品质量以及增强市场竞争力具有至关重要的意义。然而,在项目推进过程中,范围蔓延是一个常见且棘手的问题,它可能导致项目进度延迟、成本超支以及质量下降等一系列不良后果。因此,有效避免PLM项目范围蔓延成为项目成功的关键因素之一。以下将详细阐述三大管控策略,助力企业...
plm系统   0  
  PLM(产品生命周期管理)项目管理在企业产品研发与管理过程中扮演着至关重要的角色。随着市场竞争的加剧和产品复杂度的提升,PLM项目面临着诸多风险。准确量化风险优先级并采取有效措施应对,是确保项目成功的关键。五维评估矩阵作为一种有效的风险评估工具,能帮助项目管理者全面、系统地评估风险,为决策提供有力支持。五维评估矩阵概述...
免费plm软件   0  
  引言PLM(产品生命周期管理)开发流程对于企业产品的全生命周期管控至关重要。它涵盖了从产品概念设计到退役的各个阶段,直接影响着产品质量、开发周期以及企业的市场竞争力。在当今快速发展的科技环境下,客户对产品质量的要求日益提高,市场竞争也愈发激烈,这就使得优化PLM开发流程成为企业的必然选择。缺陷管理工具和六西格玛方法作为...
plm产品全生命周期管理   0  
热门文章
项目管理软件有哪些?
曾咪二维码

扫码咨询,免费领取项目管理大礼包!

云禅道AD
禅道项目管理软件

云端的项目管理软件

尊享禅道项目软件收费版功能

无需维护,随时随地协同办公

内置subversion和git源码管理

每天备份,随时转为私有部署

免费试用