使用 boto3 对 dynamoDb 进行完整扫描

2025-03-19 08:57:00
admin
原创
51
摘要:问题描述:我的表大约有 220mb,里面有 250k 条记录。我试图将所有这些数据拉入 Python。我意识到这需要分块批处理并循环进行,但我不确定如何将批处理设置为从上一个批次停止的地方开始。有什么方法可以过滤我的扫描?据我所知,过滤发生在加载后,加载在 1mb 时停止,因此我实际上无法扫描新对象。任何帮助...

问题描述:

我的表大约有 220mb,里面有 250k 条记录。我试图将所有这些数据拉入 Python。我意识到这需要分块批处理并循环进行,但我不确定如何将批处理设置为从上一个批次停止的地方开始。

有什么方法可以过滤我的扫描?据我所知,过滤发生在加载后,加载在 1mb 时停止,因此我实际上无法扫描新对象。

任何帮助都将不胜感激。

import boto3
dynamodb = boto3.resource('dynamodb',
    aws_session_token = aws_session_token,
    aws_access_key_id = aws_access_key_id,
    aws_secret_access_key = aws_secret_access_key,
    region_name = region
    )

table = dynamodb.Table('widgetsTableName')

data = table.scan()

解决方案 1:

我认为有关表扫描的Amazon DynamoDB 文档回答了您的问题。

简而言之,您需要检查LastEvaluatedKey响应。以下是使用您的代码的示例:

import boto3
dynamodb = boto3.resource('dynamodb',
                          aws_session_token=aws_session_token,
                          aws_access_key_id=aws_access_key_id,
                          aws_secret_access_key=aws_secret_access_key,
                          region_name=region
)

table = dynamodb.Table('widgetsTableName')

response = table.scan()
data = response['Items']

while 'LastEvaluatedKey' in response:
    response = table.scan(ExclusiveStartKey=response['LastEvaluatedKey'])
    data.extend(response['Items'])

解决方案 2:

DynamoDB 将该scan方法限制为每次扫描 1mb 数据。

文档: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html#DynamoDB.Client.scan

以下是使用以下命令从 DynamoDB 表中获取所有数据的示例循环LastEvaluatedKey

import boto3
client = boto3.client('dynamodb')

def dump_table(table_name):
    results = []
    last_evaluated_key = None
    while True:
        if last_evaluated_key:
            response = client.scan(
                TableName=table_name,
                ExclusiveStartKey=last_evaluated_key
            )
        else: 
            response = client.scan(TableName=table_name)
        last_evaluated_key = response.get('LastEvaluatedKey')
        
        results.extend(response['Items'])
        
        if not last_evaluated_key:
            break
    return results

# Usage
data = dump_table('your-table-name')

# do something with data

解决方案 3:

boto3 提供分页器来为您处理所有分页细节。以下是扫描分页器的文档页面。基本上,您可以像这样使用它:

import boto3

client = boto3.client('dynamodb')
paginator = client.get_paginator('scan')

for page in paginator.paginate():
    # do something

解决方案 4:

借鉴乔丹·菲利普斯的回答,下面是如何传递FilterExpression分页信息的方法:

import boto3

client = boto3.client('dynamodb')
paginator = client.get_paginator('scan')
operation_parameters = {
  'TableName': 'foo',
  'FilterExpression': 'bar > :x AND bar < :y',
  'ExpressionAttributeValues': {
    ':x': {'S': '2017-01-31T01:35'},
    ':y': {'S': '2017-01-31T02:08'},
  }
}

page_iterator = paginator.paginate(**operation_parameters)
for page in page_iterator:
    # do something

解决方案 5:

正如@kungphu 提到的,删除 dynamodb 格式类型的代码。

import boto3

from boto3.dynamodb.types import TypeDeserializer
from boto3.dynamodb.transform import TransformationInjector

client = boto3.client('dynamodb')
paginator = client.get_paginator('query')
service_model = client._service_model.operation_model('Query')
trans = TransformationInjector(deserializer = TypeDeserializer())
for page in paginator.paginate():
    trans.inject_attribute_value_output(page, service_model)

解决方案 6:

如果您来这里是为了寻找带有一些过滤表达式的分页扫描:

def scan(table, **kwargs):
    response = table.scan(**kwargs)
    yield from response['Items']
    while response.get('LastEvaluatedKey'):
        response = table.scan(ExclusiveStartKey=response['LastEvaluatedKey'], **kwargs)
        yield from response['Items']

使用示例:

table = boto3.Session(...).resource('dynamodb').Table('widgetsTableName')

items = list(scan(table, FilterExpression=Attr('name').contains('foo')))

解决方案 7:

事实证明,Boto3 捕获了“LastEvaluatedKey”作为返回响应的一部分。这可以用作扫描的起点:

data= table.scan(
   ExclusiveStartKey=data['LastEvaluatedKey']
)

我计划围绕此建立一个循环,直到返回的数据只是 ExclusiveStartKey

解决方案 8:

上面建议的两种方法都存在问题:要么编写冗长而重复的代码,在循环中明确处理分页,要么使用带有低级会话的 Boto 分页器,从而放弃高级 Boto 对象的优势。

使用 Python 函数代码提供高级抽象的解决方案允许使用更高级别的 Boto 方法,同时隐藏 AWS 分页的复杂性:

import itertools
import typing

def iterate_result_pages(function_returning_response: typing.Callable, *args, **kwargs) -> typing.Generator:
    """A wrapper for functions using AWS paging, that returns a generator which yields a sequence of items for
    every response

    Args:
        function_returning_response: A function (or callable), that returns an AWS response with 'Items' and optionally 'LastEvaluatedKey'
        This could be a bound method of an object.

    Returns:
        A generator which yields the 'Items' field of the result for every response
    """
    response = function_returning_response(*args, **kwargs)
    yield response["Items"]
    while "LastEvaluatedKey" in response:
        kwargs["ExclusiveStartKey"] = response["LastEvaluatedKey"]
        response = function_returning_response(*args, **kwargs)
        yield response["Items"]

    return

def iterate_paged_results(function_returning_response: typing.Callable, *args, **kwargs) -> typing.Iterator:
    """A wrapper for functions using AWS paging, that returns an iterator of all the items in the responses.
    Items are yielded to the caller as soon as they are received.

    Args:
        function_returning_response: A function (or callable), that returns an AWS response with 'Items' and optionally 'LastEvaluatedKey'
        This could be a bound method of an object.

    Returns:
        An iterator which yields one response item at a time
    """
    return itertools.chain.from_iterable(iterate_result_pages(function_returning_response, *args, **kwargs))

# Example, assuming 'table' is a Boto DynamoDB table object:
all_items = list(iterate_paged_results(ProjectionExpression = 'my_field'))

解决方案 9:

我对 Vincent 的答案有些疑问,因为转换应用于 LastEvaluatedKey 并弄乱了分页。解决方法如下:

import boto3

from boto3.dynamodb.types import TypeDeserializer
from boto3.dynamodb.transform import TransformationInjector

client = boto3.client('dynamodb')
paginator = client.get_paginator('scan')
operation_model = client._service_model.operation_model('Scan')
trans = TransformationInjector(deserializer = TypeDeserializer())
operation_parameters = {
  'TableName': 'tablename',  
}
items = []

for page in paginator.paginate(**operation_parameters):
    has_last_key = 'LastEvaluatedKey' in page
    if has_last_key:
        last_key = page['LastEvaluatedKey'].copy()
    trans.inject_attribute_value_output(page, operation_model)
    if has_last_key:
        page['LastEvaluatedKey'] = last_key
    items.extend(page['Items'])

解决方案 10:

我不明白为什么 Boto3 提供了高级资源抽象,却不提供分页功能。即使提供了分页功能,也很难使用!

这个问题的其他答案都很好,但我想要一种超级简单的方法来包装 boto3 方法并使用生成器提供内存高效的分页:

import typing
import boto3
import boto3.dynamodb.conditions


def paginate_dynamodb_response(dynamodb_action: typing.Callable, **kwargs) -> typing.Generator[dict, None, None]:

    # Using the syntax from https://github.com/awsdocs/aws-doc-sdk-examples/blob/main/python/example_code/dynamodb/GettingStarted/scenario_getting_started_movies.py
    keywords = kwargs

    done = False
    start_key = None

    while not done:
        if start_key:
            keywords['ExclusiveStartKey'] = start_key

        response = dynamodb_action(**keywords)

        start_key = response.get('LastEvaluatedKey', None)
        done = start_key is None

        for item in response.get("Items", []):
            yield item


## Usage ##
dynamodb_res = boto3.resource('dynamodb')
dynamodb_table = dynamodb_res.Table('my-table')

query = paginate_dynamodb_response(
    dynamodb_table.query, # The boto3 method. E.g. query or scan
    # Regular Query or Scan parameters
    #
    # IndexName='myindex' # If required
    KeyConditionExpression=boto3.dynamodb.conditions.Key('id').eq('1234')
)

for x in query:
    print(x)```

解决方案 11:

一个具体的函数,用于获取所有数据并根据您的需要传递所需的参数。希望这能有所帮助!祝您编码愉快!

import boto3


def get_all_items_from_db(table_name: str, **scan_kwargs):
    """
    Get all items from the DynamoDB table, avoiding pagination
    param: table_name
    param: scan_kwargs

    return: DynamoDB reponse object
    """
    dynamodb = boto3.resource('dynamodb')
    dynamodb_table = dynamodb.Table(table_name)
    response = dynamodb_table.scan(**scan_kwargs)  # Pass required parameters as per needs

    # Getting the last evaluated key for next pagination repsonse
    last_evaluted_key = response.get('LastEvaluatedKey')
    # Paginate returning up to 1MB of data for each iteration
    while last_evaluted_key:
        paginated_response = dynamodb_table.scan(
            **scan_kwargs,
            ExclusiveStartKey=last_evaluted_key,  # To start the iteration from the last evaluation key
        )
        last_evaluted_key = paginated_response.get('LastEvaluatedKey')
        # Extending the result list to include the paginated response
        response['Items'].extend(paginated_response['Items'])
    return response
相关推荐
  政府信创国产化的10大政策解读一、信创国产化的背景与意义信创国产化,即信息技术应用创新国产化,是当前中国信息技术领域的一个重要发展方向。其核心在于通过自主研发和创新,实现信息技术应用的自主可控,减少对外部技术的依赖,并规避潜在的技术制裁和风险。随着全球信息技术竞争的加剧,以及某些国家对中国在科技领域的打压,信创国产化显...
工程项目管理   2482  
  为什么项目管理通常仍然耗时且低效?您是否还在反复更新电子表格、淹没在便利贴中并参加每周更新会议?这确实是耗费时间和精力。借助软件工具的帮助,您可以一目了然地全面了解您的项目。如今,国内外有足够多优秀的项目管理软件可以帮助您掌控每个项目。什么是项目管理软件?项目管理软件是广泛行业用于项目规划、资源分配和调度的软件。它使项...
项目管理软件   1533  
  PLM(产品生命周期管理)项目对于企业优化产品研发流程、提升产品质量以及增强市场竞争力具有至关重要的意义。然而,在项目推进过程中,范围蔓延是一个常见且棘手的问题,它可能导致项目进度延迟、成本超支以及质量下降等一系列不良后果。因此,有效避免PLM项目范围蔓延成为项目成功的关键因素之一。以下将详细阐述三大管控策略,助力企业...
plm系统   0  
  PLM(产品生命周期管理)项目管理在企业产品研发与管理过程中扮演着至关重要的角色。随着市场竞争的加剧和产品复杂度的提升,PLM项目面临着诸多风险。准确量化风险优先级并采取有效措施应对,是确保项目成功的关键。五维评估矩阵作为一种有效的风险评估工具,能帮助项目管理者全面、系统地评估风险,为决策提供有力支持。五维评估矩阵概述...
免费plm软件   0  
  引言PLM(产品生命周期管理)开发流程对于企业产品的全生命周期管控至关重要。它涵盖了从产品概念设计到退役的各个阶段,直接影响着产品质量、开发周期以及企业的市场竞争力。在当今快速发展的科技环境下,客户对产品质量的要求日益提高,市场竞争也愈发激烈,这就使得优化PLM开发流程成为企业的必然选择。缺陷管理工具和六西格玛方法作为...
plm产品全生命周期管理   0  
热门文章
项目管理软件有哪些?
曾咪二维码

扫码咨询,免费领取项目管理大礼包!

云禅道AD
禅道项目管理软件

云端的项目管理软件

尊享禅道项目软件收费版功能

无需维护,随时随地协同办公

内置subversion和git源码管理

每天备份,随时转为私有部署

免费试用