Boto3 从 S3 Bucket 下载所有文件

2025-03-18 08:56:00
admin
原创
61
摘要:问题描述:我正在使用 boto3 从 s3 bucket 获取文件。我需要类似的功能aws s3 sync我现在的代码是#!/usr/bin/python import boto3 s3=boto3.client('s3') list=s3.list_objects(Bucket='my_bucket_nam...

问题描述:

我正在使用 boto3 从 s3 bucket 获取文件。我需要类似的功能aws s3 sync

我现在的代码是

#!/usr/bin/python
import boto3
s3=boto3.client('s3')
list=s3.list_objects(Bucket='my_bucket_name')['Contents']
for key in list:
    s3.download_file('my_bucket_name', key['Key'], key['Key'])

只要存储桶中只有文件,这种方法就没问题。如果存储桶内有文件夹,则会抛出错误

Traceback (most recent call last):
  File "./test", line 6, in <module>
    s3.download_file('my_bucket_name', key['Key'], key['Key'])
  File "/usr/local/lib/python2.7/dist-packages/boto3/s3/inject.py", line 58, in download_file
    extra_args=ExtraArgs, callback=Callback)
  File "/usr/local/lib/python2.7/dist-packages/boto3/s3/transfer.py", line 651, in download_file
    extra_args, callback)
  File "/usr/local/lib/python2.7/dist-packages/boto3/s3/transfer.py", line 666, in _download_file
    self._get_object(bucket, key, filename, extra_args, callback)
  File "/usr/local/lib/python2.7/dist-packages/boto3/s3/transfer.py", line 690, in _get_object
    extra_args, callback)
  File "/usr/local/lib/python2.7/dist-packages/boto3/s3/transfer.py", line 707, in _do_get_object
    with self._osutil.open(filename, 'wb') as f:
  File "/usr/local/lib/python2.7/dist-packages/boto3/s3/transfer.py", line 323, in open
    return open(filename, mode)
IOError: [Errno 2] No such file or directory: 'my_folder/.8Df54234'

这是使用 boto3 下载完整 s3 存储桶的正确方法吗?如何下载文件夹。


解决方案 1:

我有同样的需求,并创建了以下递归下载文件的函数。

仅当目录包含文件时才会在本地创建。

import boto3
import os

def download_dir(client, resource, dist, local='/tmp', bucket='your_bucket'):
    paginator = client.get_paginator('list_objects')
    for result in paginator.paginate(Bucket=bucket, Delimiter='/', Prefix=dist):
        if result.get('CommonPrefixes') is not None:
            for subdir in result.get('CommonPrefixes'):
                download_dir(client, resource, subdir.get('Prefix'), local, bucket)
        for file in result.get('Contents', []):
            dest_pathname = os.path.join(local, file.get('Key'))
            if not os.path.exists(os.path.dirname(dest_pathname)):
                os.makedirs(os.path.dirname(dest_pathname))
            if not file.get('Key').endswith('/'):
                resource.meta.client.download_file(bucket, file.get('Key'), dest_pathname)

该函数的调用方式如下:

def _start():
    client = boto3.client('s3')
    resource = boto3.resource('s3')
    download_dir(client, resource, 'clientconf/', '/tmp', bucket='my-bucket')

解决方案 2:

当使用包含 1000 多个对象的存储桶时,需要实施一个解决方案,该解决方案使用NextContinuationToken最多 1000 个键的连续集合。此解决方案首先编译对象列表,然后迭代创建指定目录并下载现有对象。

import boto3
import os

s3_client = boto3.client('s3')

def download_dir(prefix, local, bucket, client=s3_client):
    """
    params:
    - prefix: pattern to match in s3
    - local: local path to folder in which to place files
    - bucket: s3 bucket with target contents
    - client: initialized s3 client object
    """
    keys = []
    dirs = []
    next_token = ''
    base_kwargs = {
        'Bucket':bucket,
        'Prefix':prefix,
    }
    while next_token is not None:
        kwargs = base_kwargs.copy()
        if next_token != '':
            kwargs.update({'ContinuationToken': next_token})
        results = client.list_objects_v2(**kwargs)
        contents = results.get('Contents')
        for i in contents:
            k = i.get('Key')
            if k[-1] != '/':
                keys.append(k)
            else:
                dirs.append(k)
        next_token = results.get('NextContinuationToken')
    for d in dirs:
        dest_pathname = os.path.join(local, d)
        if not os.path.exists(os.path.dirname(dest_pathname)):
            os.makedirs(os.path.dirname(dest_pathname))
    for k in keys:
        dest_pathname = os.path.join(local, k)
        if not os.path.exists(os.path.dirname(dest_pathname)):
            os.makedirs(os.path.dirname(dest_pathname))
        client.download_file(bucket, k, dest_pathname)

解决方案 3:

import os
import boto3

#initiate s3 resource
s3 = boto3.resource('s3')

# select bucket
my_bucket = s3.Bucket('my_bucket_name')

# download file into current directory
for s3_object in my_bucket.objects.all():
    # Need to split s3_object.key into path and file name, else it will give error file not found.
    path, filename = os.path.split(s3_object.key)
    my_bucket.download_file(s3_object.key, filename)

解决方案 4:

Amazon S3 没有文件夹/目录。它是一个平面文件结构

为了保持目录的外观,路径名被存储为对象 Key(文件名)的一部分。例如:

  • images/foo.jpg

在这种情况下,整个 Key 是images/foo.jpg,而不仅仅是foo.jpg

我怀疑您的问题是boto返回了一个名为的文件my_folder/.8Df54234并尝试将其保存到本地文件系统。但是,您的本地文件系统将该my_folder/部分解释为目录名称,而该目录在您的本地文件系统中不存在

您可以截断文件名以仅保存.8Df54234部分内容,或者必须在写入文件之前创建必要的目录。请注意,它可以是多层嵌套目录。

一种更简单的方法是使用AWS 命令​​行界面 (CLI),它将为您完成所有这些工作,例如:

aws s3 cp --recursive s3://my_bucket_name local_folder

还有一个sync选项可以仅复制新的和修改过的文件。

解决方案 5:

我目前正在通过使用以下方法完成任务

#!/usr/bin/python
import boto3
s3=boto3.client('s3')
list=s3.list_objects(Bucket='bucket')['Contents']
for s3_key in list:
    s3_object = s3_key['Key']
    if not s3_object.endswith("/"):
        s3.download_file('bucket', s3_object, s3_object)
    else:
        import os
        if not os.path.exists(s3_object):
            os.makedirs(s3_object)

虽然它确实能完成工作,但我不确定这样做是否好。我把它留在这里是为了帮助其他用户和进一步的回答,以更好的方式实现这一点

解决方案 6:

迟做总比不做好:)之前关于分页器的回答确实很好。但是它是递归的,最终可能会达到 Python 的递归限制。这是另一种方法,需要进行一些额外的检查。

import os
import errno
import boto3


def assert_dir_exists(path):
    """
    Checks if directory tree in path exists. If not it created them.
    :param path: the path to check if it exists
    """
    try:
        os.makedirs(path)
    except OSError as e:
        if e.errno != errno.EEXIST:
            raise


def download_dir(client, bucket, path, target):
    """
    Downloads recursively the given S3 path to the target directory.
    :param client: S3 client to use.
    :param bucket: the name of the bucket to download from
    :param path: The S3 directory to download.
    :param target: the local directory to download the files to.
    """

    # Handle missing / at end of prefix
    if not path.endswith('/'):
        path += '/'

    paginator = client.get_paginator('list_objects_v2')
    for result in paginator.paginate(Bucket=bucket, Prefix=path):
        # Download each file individually
        for key in result['Contents']:
            # Calculate relative path
            rel_path = key['Key'][len(path):]
            # Skip paths ending in /
            if not key['Key'].endswith('/'):
                local_file_path = os.path.join(target, rel_path)
                # Make sure directories exist
                local_file_dir = os.path.dirname(local_file_path)
                assert_dir_exists(local_file_dir)
                client.download_file(bucket, key['Key'], local_file_path)


client = boto3.client('s3')

download_dir(client, 'bucket-name', 'path/to/data', 'downloads')

解决方案 7:

这里的许多解决方案都非常复杂。如果您正在寻找更简单的解决方案,cloudpathlib请以良好的方式包装此用例,以便下载目录或文件。

from cloudpathlib import CloudPath

cp = CloudPath("s3://bucket/product/myproject/2021-02-15/")
cp.download_to("local_folder")

注意:对于包含大量文件的大型文件夹,awscli使用命令行可能会更快。

解决方案 8:

我有一个解决方法,即在相同的进程中运行 AWS CLI。

安装awscli为 python lib:

pip install awscli

然后定义这个函数:

from awscli.clidriver import create_clidriver

def aws_cli(*cmd):
    old_env = dict(os.environ)
    try:

        # Environment
        env = os.environ.copy()
        env['LC_CTYPE'] = u'en_US.UTF'
        os.environ.update(env)

        # Run awscli in the same process
        exit_code = create_clidriver().main(*cmd)

        # Deal with problems
        if exit_code > 0:
            raise RuntimeError('AWS CLI exited with code {}'.format(exit_code))
    finally:
        os.environ.clear()
        os.environ.update(old_env)

执行:

aws_cli('s3', 'sync', '/path/to/source', 's3://bucket/destination', '--delete')

解决方案 9:

来自 AWS S3 文档(如何使用 S3 存储桶中的文件夹?):

在 Amazon S3 中,存储桶和对象是主要资源,对象存储在存储桶中。Amazon S3 具有扁平结构,而不是像文件系统中那样的层次结构。但是,为了组织简单起见,Amazon S3 控制台支持文件夹概念作为对对象进行分组的一种方式。Amazon S3 通过使用对象的共享名称前缀来实现这一点(即,对象的名称以通用字符串开头)。对象名称也称为键名称。

例如,您可以在控制台上创建一个名为 photos 的文件夹,并在其中存储一个名为 myphoto.jpg 的对象。然后使用键名 photos/myphoto.jpg 存储该对象,其中 photos/ 是前缀。

要将所有文件从“mybucket”下载到当前目录中,并遵循存储桶的模拟目录结构(如果文件夹在本地不存在,则从存储桶中创建文件夹):

import boto3
import os

bucket_name = "mybucket"
s3 = boto3.client("s3")
objects = s3.list_objects(Bucket = bucket_name)["Contents"]
for s3_object in objects:
    s3_key = s3_object["Key"]
    path, filename = os.path.split(s3_key)
    if len(path) != 0 and not os.path.exists(path):
        os.makedirs(path)
    if not s3_key.endswith("/"):
        download_to = path + '/' + filename if path else filename
        s3.download_file(bucket_name, s3_key, download_to)

解决方案 10:

我已经更新了 Grant 的答案,以便并行运行,如果有人感兴趣的话,它会更快:

from concurrent import futures
import os
import boto3

def download_dir(prefix, local, bucket):

    client = boto3.client('s3')

    def create_folder_and_download_file(k):
        dest_pathname = os.path.join(local, k)
        if not os.path.exists(os.path.dirname(dest_pathname)):
            os.makedirs(os.path.dirname(dest_pathname))
        print(f'downloading {k} to {dest_pathname}')
        client.download_file(bucket, k, dest_pathname)

    keys = []
    dirs = []
    next_token = ''
    base_kwargs = {
        'Bucket': bucket,
        'Prefix': prefix,
    }
    while next_token is not None:
        kwargs = base_kwargs.copy()
        if next_token != '':
            kwargs.update({'ContinuationToken': next_token})
        results = client.list_objects_v2(**kwargs)
        contents = results.get('Contents')
        for i in contents:
            k = i.get('Key')
            if k[-1] != '/':
                keys.append(k)
            else:
                dirs.append(k)
        next_token = results.get('NextContinuationToken')
    for d in dirs:
        dest_pathname = os.path.join(local, d)
        if not os.path.exists(os.path.dirname(dest_pathname)):
            os.makedirs(os.path.dirname(dest_pathname))
    with futures.ThreadPoolExecutor() as executor:
        futures.wait(
            [executor.submit(create_folder_and_download_file, k) for k in keys],
            return_when=futures.FIRST_EXCEPTION,
        )

解决方案 11:

另一个使用 asyncio/aioboto 的并行下载器

import os, time
import asyncio
from itertools import chain
import json
from typing import List
from json.decoder import WHITESPACE
import logging
from functools import partial
from pprint import pprint as pp

# Third Party
import asyncpool
import aiobotocore.session
import aiobotocore.config

_NUM_WORKERS = 50


bucket_name= 'test-data'
bucket_prefix= 'etl2/test/20210330/f_api'


async def save_to_file(s3_client, bucket: str, key: str):
    
    response = await s3_client.get_object(Bucket=bucket, Key=key)
    async with response['Body'] as stream:
        content = await stream.read()
    
    if 1:
        fn =f'out/downloaded/{bucket_name}/{key}'

        dn= os.path.dirname(fn)
        if not isdir(dn):
            os.makedirs(dn,exist_ok=True)
        if 1:
            with open(fn, 'wb') as fh:
                fh.write(content)
                print(f'Downloaded to: {fn}')
   
    return [0]


async def go(bucket: str, prefix: str) -> List[dict]:
    """
    Returns list of dicts of object contents

    :param bucket: s3 bucket
    :param prefix: s3 bucket prefix
    :return: list of download statuses
    """
    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger()

    session = aiobotocore.session.AioSession()
    config = aiobotocore.config.AioConfig(max_pool_connections=_NUM_WORKERS)
    contents = []
    async with session.create_client('s3', config=config) as client:
        worker_co = partial(save_to_file, client, bucket)
        async with asyncpool.AsyncPool(None, _NUM_WORKERS, 's3_work_queue', logger, worker_co,
                                       return_futures=True, raise_on_join=True, log_every_n=10) as work_pool:
            # list s3 objects using paginator
            paginator = client.get_paginator('list_objects')
            async for result in paginator.paginate(Bucket=bucket, Prefix=prefix):
                for c in result.get('Contents', []):
                    contents.append(await work_pool.push(c['Key'], client))

    # retrieve results from futures
    contents = [c.result() for c in contents]
    return list(chain.from_iterable(contents))


def S3_download_bucket_files():
    s = time.perf_counter()
    _loop = asyncio.get_event_loop()
    _result = _loop.run_until_complete(go(bucket_name, bucket_prefix))
    assert sum(_result)==0, _result
    print(_result)
    elapsed = time.perf_counter() - s
    print(f"{__file__} executed in {elapsed:0.2f} seconds.")

它将首先从 S3 获取文件列表,然后使用 aioboto 下载,_NUM_WORKERS=50 从网络并行读取数据。

解决方案 12:

如果您想使用 python 调用 bash 脚本,这里有一个简单的方法可以将文件从 S3 存储桶中的文件夹加载到本地文件夹(在 Linux 机器中):

import boto3
import subprocess
import os

###TOEDIT###
my_bucket_name = "your_my_bucket_name"
bucket_folder_name = "your_bucket_folder_name"
local_folder_path = "your_local_folder_path"
###TOEDIT###

# 1.Load thes list of files existing in the bucket folder
FILES_NAMES = []
s3 = boto3.resource('s3')
my_bucket = s3.Bucket('{}'.format(my_bucket_name))
for object_summary in my_bucket.objects.filter(Prefix="{}/".format(bucket_folder_name)):
#     print(object_summary.key)
    FILES_NAMES.append(object_summary.key)

# 2.List only new files that do not exist in local folder (to not copy everything!)
new_filenames = list(set(FILES_NAMES )-set(os.listdir(local_folder_path)))

# 3.Time to load files in your destination folder 
for new_filename in new_filenames:
    upload_S3files_CMD = """aws s3 cp s3://{}/{}/{} {}""".format(my_bucket_name,bucket_folder_name,new_filename ,local_folder_path)

    subprocess_call = subprocess.call([upload_S3files_CMD], shell=True)
    if subprocess_call != 0:
        print("ALERT: loading files not working correctly, please re-check new loaded files")

解决方案 13:

一次性获取所有文件是一个非常糟糕的主意,您应该分批获取。

我用来从 S3 获取特定文件夹(目录)的一个实现是,

def get_directory(directory_path, download_path, exclude_file_names):
    # prepare session
    session = Session(aws_access_key_id, aws_secret_access_key, region_name)
    
    # get instances for resource and bucket
    resource = session.resource('s3')
    bucket = resource.Bucket(bucket_name)

    for s3_key in self.client.list_objects(Bucket=self.bucket_name, Prefix=directory_path)['Contents']:
        s3_object = s3_key['Key']
        if s3_object not in exclude_file_names:
            bucket.download_file(file_path, download_path + str(s3_object.split('/')[-1])

如果你仍然想获得整个存储桶,请通过 CLI 使用它,正如@John Rotenstein如下所述,

aws s3 cp --recursive s3://bucket_name download_path

解决方案 14:

for objs in my_bucket.objects.all():
    print(objs.key)
    path='/tmp/'+os.sep.join(objs.key.split(os.sep)[:-1])
    try:
        if not os.path.exists(path):
            os.makedirs(path)
        my_bucket.download_file(objs.key, '/tmp/'+objs.key)
    except FileExistsError as fe:                          
        print(objs.key+' exists')

此代码将下载目录中的内容/tmp/。如果您愿意,可以更改目录。

解决方案 15:

我得到了类似的要求,并通过阅读上述一些解决方案和其他网站获得了帮助,我提出了下面的脚本,只是想分享它是否可以帮助任何人。

from boto3.session import Session
import os

def sync_s3_folder(access_key_id,secret_access_key,bucket_name,folder,destination_path):    
    session = Session(aws_access_key_id=access_key_id,aws_secret_access_key=secret_access_key)
    s3 = session.resource('s3')
    your_bucket = s3.Bucket(bucket_name)
    for s3_file in your_bucket.objects.all():
        if folder in s3_file.key:
            file=os.path.join(destination_path,s3_file.key.replace('/','\\'))
            if not os.path.exists(os.path.dirname(file)):
                os.makedirs(os.path.dirname(file))
            your_bucket.download_file(s3_file.key,file)
sync_s3_folder(access_key_id,secret_access_key,bucket_name,folder,destination_path)

解决方案 16:

重新发布@glefait 的答案,并在末尾加上 if 条件,以避免出现操作系统错误 20。它获取的第一个键是文件夹名称本身,该名称不能写入目标路径。

def download_dir(client, resource, dist, local='/tmp', bucket='your_bucket'):
    paginator = client.get_paginator('list_objects')
    for result in paginator.paginate(Bucket=bucket, Delimiter='/', Prefix=dist):
        if result.get('CommonPrefixes') is not None:
            for subdir in result.get('CommonPrefixes'):
                download_dir(client, resource, subdir.get('Prefix'), local, bucket)
        for file in result.get('Contents', []):
            print("Content: ",result)
            dest_pathname = os.path.join(local, file.get('Key'))
            print("Dest path: ",dest_pathname)
            if not os.path.exists(os.path.dirname(dest_pathname)):
                print("here last if")
                os.makedirs(os.path.dirname(dest_pathname))
            print("else file key: ", file.get('Key'))
            if not file.get('Key') == dist:
                print("Key not equal? ",file.get('Key'))
                resource.meta.client.download_file(bucket, file.get('Key'), dest_pathname)enter code here

解决方案 17:

我遇到这个问题已经有一段时间了,而且我浏览过的所有不同论坛都没有找到一个完整的端到端的片段。因此,我继续收集所有部分(自己添加了一些内容),并创建了一个完整的端到端 S3 下载器!

这不仅会自动下载文件,而且如果 S3 文件位于子目录中,它还会在本地存储中创建它们。在我的应用程序实例中,我需要设置权限和所有者,因此我也添加了它(如果不需要,可以注释掉)。

这已经过测试并且可以在 Docker 环境(K8)中运行,但我已在脚本中添加了环境变量,以防您想在本地测试/运行它。

我希望这可以帮助人们找到 S3 下载自动化。如果需要,我也欢迎任何关于如何更好地优化的建议、信息等。

#!/usr/bin/python3
import gc
import logging
import os
import signal
import sys
import time
from datetime import datetime

import boto
from boto.exception import S3ResponseError
from pythonjsonlogger import jsonlogger

formatter = jsonlogger.JsonFormatter('%(message)%(levelname)%(name)%(asctime)%(filename)%(lineno)%(funcName)')

json_handler_out = logging.StreamHandler()
json_handler_out.setFormatter(formatter)

#Manual Testing Variables If Needed
#os.environ["DOWNLOAD_LOCATION_PATH"] = "some_path"
#os.environ["BUCKET_NAME"] = "some_bucket"
#os.environ["AWS_ACCESS_KEY"] = "some_access_key"
#os.environ["AWS_SECRET_KEY"] = "some_secret"
#os.environ["LOG_LEVEL_SELECTOR"] = "DEBUG, INFO, or ERROR"

#Setting Log Level Test
logger = logging.getLogger('json')
logger.addHandler(json_handler_out)
logger_levels = {
    'ERROR' : logging.ERROR,
    'INFO' : logging.INFO,
    'DEBUG' : logging.DEBUG
}
logger_level_selector = os.environ["LOG_LEVEL_SELECTOR"]
logger.setLevel(logger_level_selector)

#Getting Date/Time
now = datetime.now()
logger.info("Current date and time : ")
logger.info(now.strftime("%Y-%m-%d %H:%M:%S"))

#Establishing S3 Variables and Download Location
download_location_path = os.environ["DOWNLOAD_LOCATION_PATH"]
bucket_name = os.environ["BUCKET_NAME"]
aws_access_key_id = os.environ["AWS_ACCESS_KEY"]
aws_access_secret_key = os.environ["AWS_SECRET_KEY"]
logger.debug("Bucket: %s" % bucket_name)
logger.debug("Key: %s" % aws_access_key_id)
logger.debug("Secret: %s" % aws_access_secret_key)
logger.debug("Download location path: %s" % download_location_path)

#Creating Download Directory
if not os.path.exists(download_location_path):
    logger.info("Making download directory")
    os.makedirs(download_location_path)

#Signal Hooks are fun
class GracefulKiller:
    kill_now = False
    def __init__(self):
        signal.signal(signal.SIGINT, self.exit_gracefully)
        signal.signal(signal.SIGTERM, self.exit_gracefully)
    def exit_gracefully(self, signum, frame):
        self.kill_now = True

#Downloading from S3 Bucket
def download_s3_bucket():
    conn = boto.connect_s3(aws_access_key_id, aws_access_secret_key)
    logger.debug("Connection established: ")
    bucket = conn.get_bucket(bucket_name)
    logger.debug("Bucket: %s" % str(bucket))
    bucket_list = bucket.list()
#    logger.info("Number of items to download: {0}".format(len(bucket_list)))

    for s3_item in bucket_list:
        key_string = str(s3_item.key)
        logger.debug("S3 Bucket Item to download: %s" % key_string)
        s3_path = download_location_path + "/" + key_string
        logger.debug("Downloading to: %s" % s3_path)
        local_dir = os.path.dirname(s3_path)

        if not os.path.exists(local_dir):
            logger.info("Local directory doesn't exist, creating it... %s" % local_dir)
            os.makedirs(local_dir)
            logger.info("Updating local directory permissions to %s" % local_dir)
#Comment or Uncomment Permissions based on Local Usage
            os.chmod(local_dir, 0o775)
            os.chown(local_dir, 60001, 60001)
        logger.debug("Local directory for download: %s" % local_dir)
        try:
            logger.info("Downloading File: %s" % key_string)
            s3_item.get_contents_to_filename(s3_path)
            logger.info("Successfully downloaded File: %s" % s3_path)
            #Updating Permissions
            logger.info("Updating Permissions for %s" % str(s3_path))
#Comment or Uncomment Permissions based on Local Usage
            os.chmod(s3_path, 0o664)
            os.chown(s3_path, 60001, 60001)
        except (OSError, S3ResponseError) as e:
            logger.error("Fatal error in s3_item.get_contents_to_filename", exc_info=True)
            # logger.error("Exception in file download from S3: {}".format(e))
            continue
        logger.info("Deleting %s from S3 Bucket" % str(s3_item.key))
        s3_item.delete()

def main():
    killer = GracefulKiller()
    while not killer.kill_now:
        logger.info("Checking for new files on S3 to download...")
        download_s3_bucket()
        logger.info("Done checking for new files, will check in 120s...")
        gc.collect()
        sys.stdout.flush()
        time.sleep(120)
if __name__ == '__main__':
    main()

解决方案 18:

S3 组织文件的方式与 Windows 组织文件的方式有非常小的差异。下面是一个简单的自文档示例,解释了这些差异。

另外:将 amazon 文件名视为普通字符串。它们实际上并不代表文件夹。Amazon 会模拟文件夹,因此如果您尝试将文件塞入系统中不存在的文件夹的名称中,它无法确定将其放在哪里。因此,您必须在系统上为 S3 中的每个模拟文件夹创建一个文件夹。如果文件夹中有一个文件夹,请不要使用“mkdir(path)”,否则将不起作用。您必须使用“makedirs(path)”。还有一件事!-> PC 文件路径的格式很奇怪。Amazon 使用“/”,而 pc 使用“\”,并且整个文件名必须相同。查看下面的代码块(也显示身份验证)。

在我的示例中,我的存储桶中有一个名为“iTovenGUIImages/gui_media”的文件夹。我想将其放在系统上可能尚不存在的文件夹中。系统上的文件夹有自己的特殊前缀,只要其格式类似于文件夹路径,就可以是系统中您想要的任何内容。

import boto3
import cred
import os

locale_file_Imagedirectory = r"C:\\Temp\\App Data\\iToven AI\\\"  # This is where all GUI files for iToven AI exist on PC


def downloadImageDirectoryS3(remoteDirectoryName, desired_parent_folder):
    my_bucket = 'itovenbucket'
    s3_resource = boto3.resource('s3', aws_access_key_id=cred.AWSAccessKeyId,
                                 aws_secret_access_key=cred.AWSSecretKey)
    bucket = s3_resource.Bucket(my_bucket)
    for obj in bucket.objects.filter(Prefix=remoteDirectoryName):
        pcVersionPrefix = remoteDirectoryName.replace("/", r"\\\")
        isolatedFileName = obj.key.replace(remoteDirectoryName, "")
        clientSideFileName = desired_parent_folder+pcVersionPrefix+isolatedFileName
        print(clientSideFileName)  # Client-Side System File Structure
        if not os.path.exists(desired_parent_folder+pcVersionPrefix):  # CREATE DIRECTORIES FOR EACH FOLDER RECURSIVELY
            os.makedirs(desired_parent_folder+pcVersionPrefix)
        if obj.key not in desired_parent_folder+pcVersionPrefix:
            bucket.download_file(obj.key, clientSideFileName)  # save to new path


downloadImageDirectoryS3(r"iTovenGUIImagesPC/gui_media/", locale_file_Imagedirectory)

解决方案 19:

使用诸如 boto3 之类的 API连接到存储桶,可能按模式搜索,并将对象键映射到本地目录中(当不存在时动态创建它们)。

import boto3
from pathlib import Path

from dotenv import load_dotenv
load_dotenv()

s3 = boto3.resource('s3',     
                    aws_access_key_id=os.environ["AWS_KEY_ID"],
                    aws_secret_access_key=os.environ["AWS_ACCESS_KEY"],
                    region_name="eu-central-1")

bucket = s3.Bucket("my-important-bucket") #  your bucket on S3 AWS Storage
prefix = "_datasets/v9/..."  # prefix within the bucket
local_path = Path("/workspace/src/...") # your local path

for path in  bucket.objects.filter(Prefix=prefix).all():
    download_path = Path(local_path) / Path(path.key).relative_to(prefix)
    download_path.parent.mkdir(parents=True, exist_ok=True)
    if not download_path.is_dir():
        bucket.download_file(path.key, download_path)
相关推荐
  政府信创国产化的10大政策解读一、信创国产化的背景与意义信创国产化,即信息技术应用创新国产化,是当前中国信息技术领域的一个重要发展方向。其核心在于通过自主研发和创新,实现信息技术应用的自主可控,减少对外部技术的依赖,并规避潜在的技术制裁和风险。随着全球信息技术竞争的加剧,以及某些国家对中国在科技领域的打压,信创国产化显...
工程项目管理   2482  
  为什么项目管理通常仍然耗时且低效?您是否还在反复更新电子表格、淹没在便利贴中并参加每周更新会议?这确实是耗费时间和精力。借助软件工具的帮助,您可以一目了然地全面了解您的项目。如今,国内外有足够多优秀的项目管理软件可以帮助您掌控每个项目。什么是项目管理软件?项目管理软件是广泛行业用于项目规划、资源分配和调度的软件。它使项...
项目管理软件   1533  
  PLM(产品生命周期管理)项目对于企业优化产品研发流程、提升产品质量以及增强市场竞争力具有至关重要的意义。然而,在项目推进过程中,范围蔓延是一个常见且棘手的问题,它可能导致项目进度延迟、成本超支以及质量下降等一系列不良后果。因此,有效避免PLM项目范围蔓延成为项目成功的关键因素之一。以下将详细阐述三大管控策略,助力企业...
plm系统   0  
  PLM(产品生命周期管理)项目管理在企业产品研发与管理过程中扮演着至关重要的角色。随着市场竞争的加剧和产品复杂度的提升,PLM项目面临着诸多风险。准确量化风险优先级并采取有效措施应对,是确保项目成功的关键。五维评估矩阵作为一种有效的风险评估工具,能帮助项目管理者全面、系统地评估风险,为决策提供有力支持。五维评估矩阵概述...
免费plm软件   0  
  引言PLM(产品生命周期管理)开发流程对于企业产品的全生命周期管控至关重要。它涵盖了从产品概念设计到退役的各个阶段,直接影响着产品质量、开发周期以及企业的市场竞争力。在当今快速发展的科技环境下,客户对产品质量的要求日益提高,市场竞争也愈发激烈,这就使得优化PLM开发流程成为企业的必然选择。缺陷管理工具和六西格玛方法作为...
plm产品全生命周期管理   0  
热门文章
项目管理软件有哪些?
曾咪二维码

扫码咨询,免费领取项目管理大礼包!

云禅道AD
禅道项目管理软件

云端的项目管理软件

尊享禅道项目软件收费版功能

无需维护,随时随地协同办公

内置subversion和git源码管理

每天备份,随时转为私有部署

免费试用