如何将数据从 mongodb 导入到 pandas?

2025-04-16 08:56:00
admin
原创
13
摘要:问题描述:我在 MongoDB 的一个集合中有大量数据需要分析。如何将这些数据导入 Pandas?我对 pandas 和 numpy 还不熟悉。编辑:MongoDB集合包含带有日期和时间标记的传感器值。传感器值为浮点数据类型。示例数据:{ "_cls" : "SensorRepo...

问题描述:

我在 MongoDB 的一个集合中有大量数据需要分析。如何将这些数据导入 Pandas?

我对 pandas 和 numpy 还不熟悉。

编辑:MongoDB集合包含带有日期和时间标记的传感器值。传感器值为浮点数据类型。

示例数据:

{
"_cls" : "SensorReport",
"_id" : ObjectId("515a963b78f6a035d9fa531b"),
"_types" : [
    "SensorReport"
],
"Readings" : [
    {
        "a" : 0.958069536790466,
        "_types" : [
            "Reading"
        ],
        "ReadingUpdatedDate" : ISODate("2013-04-02T08:26:35.297Z"),
        "b" : 6.296118156595,
        "_cls" : "Reading"
    },
    {
        "a" : 0.95574014778624,
        "_types" : [
            "Reading"
        ],
        "ReadingUpdatedDate" : ISODate("2013-04-02T08:27:09.963Z"),
        "b" : 6.29651468650064,
        "_cls" : "Reading"
    },
    {
        "a" : 0.953648289182713,
        "_types" : [
            "Reading"
        ],
        "ReadingUpdatedDate" : ISODate("2013-04-02T08:27:37.545Z"),
        "b" : 7.29679823731148,
        "_cls" : "Reading"
    },
    {
        "a" : 0.955931884300997,
        "_types" : [
            "Reading"
        ],
        "ReadingUpdatedDate" : ISODate("2013-04-02T08:28:21.369Z"),
        "b" : 6.29642922525632,
        "_cls" : "Reading"
    },
    {
        "a" : 0.95821381,
        "_types" : [
            "Reading"
        ],
        "ReadingUpdatedDate" : ISODate("2013-04-02T08:41:20.801Z"),
        "b" : 7.28956613,
        "_cls" : "Reading"
    },
    {
        "a" : 4.95821335,
        "_types" : [
            "Reading"
        ],
        "ReadingUpdatedDate" : ISODate("2013-04-02T08:41:36.931Z"),
        "b" : 6.28956574,
        "_cls" : "Reading"
    },
    {
        "a" : 9.95821341,
        "_types" : [
            "Reading"
        ],
        "ReadingUpdatedDate" : ISODate("2013-04-02T08:42:09.971Z"),
        "b" : 0.28956488,
        "_cls" : "Reading"
    },
    {
        "a" : 1.95667927,
        "_types" : [
            "Reading"
        ],
        "ReadingUpdatedDate" : ISODate("2013-04-02T08:43:55.463Z"),
        "b" : 0.29115237,
        "_cls" : "Reading"
    }
],
"latestReportTime" : ISODate("2013-04-02T08:43:55.463Z"),
"sensorName" : "56847890-0",
"reportCount" : 8
}

解决方案 1:

pymongo可能会对你有所帮助,以下是我正在使用的一些代码:

import pandas as pd
from pymongo import MongoClient


def _connect_mongo(host, port, username, password, db):
    """ A util for making a connection to mongo """

    if username and password:
        mongo_uri = 'mongodb://%s:%s@%s:%s/%s' % (username, password, host, port, db)
        conn = MongoClient(mongo_uri)
    else:
        conn = MongoClient(host, port)


    return conn[db]


def read_mongo(db, collection, query={}, host='localhost', port=27017, username=None, password=None, no_id=True):
    """ Read from Mongo and Store into DataFrame """

    # Connect to MongoDB
    db = _connect_mongo(host=host, port=port, username=username, password=password, db=db)

    # Make a query to the specific DB and Collection
    cursor = db[collection].find(query)

    # Expand the cursor and construct the DataFrame
    df =  pd.DataFrame(list(cursor))

    # Delete the _id
    if no_id:
        del df['_id']

    return df

解决方案 2:

你可以使用这段代码将 MongoDB 数据加载到 Pandas DataFrame 中。对我来说,这个方法很有效。希望对你也一样有效。

import pymongo
import pandas as pd
from pymongo import MongoClient
client = MongoClient()
db = client.database_name
collection = db.collection_name
data = pd.DataFrame(list(collection.find()))

解决方案 3:

根据 PEP,简单比复杂好:

import pandas as pd
df = pd.DataFrame.from_records(db.<database_name>.<collection_name>.find())

您可以像使用常规 mongoDB 数据库一样包含条件,甚至可以使用 find_one() 从数据库中获取仅一个元素,等等。

瞧!

解决方案 4:

Monary确实如此,而且速度超级快。(另一个链接)

查看这篇很酷的帖子,其中包括快速教程和一些时间安排。

解决方案 5:

我发现非常有用的另一个选项是:

from pandas.io.json import json_normalize

cursor = my_collection.find()
df = json_normalize(cursor)

(或者json_normalize(list(cursor)),取决于您的 python/pandas 版本)。

这样,您就可以免费获得嵌套 mongodb 文档的展开。

解决方案 6:

import pandas as pd
from odo import odo

data = odo('mongodb://localhost/db::collection', pd.DataFrame)

解决方案 7:

为了有效地处理核外(不适合 RAM)数据(即并行执行),您可以尝试Python Blaze 生态系统:Blaze / Dask / Odo。

Blaze(和Odo)具有开箱即用的功能来处理 MongoDB。

以下是一些有用的文章:

  • Blaze Expessions 介绍(附 MongoDB 查询示例)

  • ReproduceIt:Reddit 字数统计

  • Dask 数组和 Blaze 之间的区别

还有一篇文章展示了 Blaze 堆栈可以实现哪些令人惊叹的功能:使用 Blaze 和 Impala 分析 17 亿条 Reddit 评论(本质上,在几秒钟内查询 975 Gb 的 Reddit 评论)。

PS:我与这些技术均无关联。

解决方案 8:

使用

pandas.DataFrame(list(...))

如果迭代器/生成器的结果很大,将会消耗大量内存

最好生成小块并在最后连接

def iterator2dataframes(iterator, chunk_size: int):
  """Turn an iterator into multiple small pandas.DataFrame

  This is a balance between memory and efficiency
  """
  records = []
  frames = []
  for i, record in enumerate(iterator):
    records.append(record)
    if i % chunk_size == chunk_size - 1:
      frames.append(pd.DataFrame(records))
      records = []
  if records:
    frames.append(pd.DataFrame(records))
  return pd.concat(frames)

解决方案 9:

您还可以使用pymongoarrow——它是 MongoDB 提供的官方库,用于将 mongodb 数据导出到 pandas、numPy、parquet 文件等。

解决方案 10:

http://docs.mongodb.org/manual/reference/mongoexport

导出为 csv 并使用read_csv
或 JSON 并使用DataFrame.from_records()

解决方案 11:

您可以使用pdmongo通过三行代码实现您想要的目标:

import pdmongo as pdm
import pandas as pd
df = pdm.read_mongo("MyCollection", [], "mongodb://localhost:27017/mydb")

如果您的数据非常大,您可以先进行聚合查询,过滤掉不需要的数据,然后将它们映射到您想要的列。

Readings.a以下是映射到列a并按列过滤的示例reportCount

import pdmongo as pdm
import pandas as pd
df = pdm.read_mongo("MyCollection", [{'$match': {'reportCount': {'$gt': 6}}}, {'$unwind': '$Readings'}, {'$project': {'a': '$Readings.a'}}], "mongodb://localhost:27017/mydb")

read_mongo接受与pymongo 聚合相同的参数

解决方案 12:

虽然这是一篇旧帖子,但我认为它至今仍然非常相关,因为 MongoDB 和 Pandas 的受欢迎程度随着时间的推移而增加,并且将继续增加。

MongoDB 最近创建了一个名为“ PyMongoArrow ”的新库,它允许您仅用几行代码就轻松地将数据从 MongoDB 数据库移动到许多其他数据格式,例如 Pandas DataFrame、Numpy Array 或 Apache Arrow Table。

它开箱即用地支持多种数据类型,包括您提到的浮点型和日期时间型。有关支持的数据类型的更多详细信息,请参阅其文档。它基于 PyMongo 构建。

解决方案 13:

参考waitingkuo的精彩回答,我想补充一下使用 chunksize 实现类似.read_sql()和.read_csv()的功能。我扩展了Deu Leung的回答,避免了逐个遍历“迭代器”/“游标”的每个“记录”。我借用了之前的read_mongo函数。

def read_mongo(db, 
           collection, query={}, 
           host='localhost', port=27017, 
           username=None, password=None,
           chunksize = 100, no_id=True):
""" Read from Mongo and Store into DataFrame """


# Connect to MongoDB
#db = _connect_mongo(host=host, port=port, username=username, password=password, db=db)
client = MongoClient(host=host, port=port)
# Make a query to the specific DB and Collection
db_aux = client[db]


# Some variables to create the chunks
skips_variable = range(0, db_aux[collection].find(query).count(), int(chunksize))
if len(skips_variable)<=1:
    skips_variable = [0,len(skips_variable)]

# Iteration to create the dataframe in chunks.
for i in range(1,len(skips_variable)):

    # Expand the cursor and construct the DataFrame
    #df_aux =pd.DataFrame(list(cursor_aux[skips_variable[i-1]:skips_variable[i]]))
    df_aux =pd.DataFrame(list(db_aux[collection].find(query)[skips_variable[i-1]:skips_variable[i]]))

    if no_id:
        del df_aux['_id']

    # Concatenate the chunks into a unique df
    if 'df' not in locals():
        df =  df_aux
    else:
        df = pd.concat([df, df_aux], ignore_index=True)

return df

解决方案 14:

Rafael Valero、waitingkuo 和 Deu Leung 也采用了类似的分页方法:

def read_mongo(
       # db, 
       collection, query=None, 
       # host='localhost', port=27017, username=None, password=None,
       chunksize = 100, page_num=1, no_id=True):

    # Connect to MongoDB
    db = _connect_mongo(host=host, port=port, username=username, password=password, db=db)

    # Calculate number of documents to skip
    skips = chunksize * (page_num - 1)

    # Sorry, this is in spanish
    # https://www.toptal.com/python/c%C3%B3digo-buggy-python-los-10-errores-m%C3%A1s-comunes-que-cometen-los-desarrolladores-python/es
    if not query:
        query = {}

    # Make a query to the specific DB and Collection
    cursor = db[collection].find(query).skip(skips).limit(chunksize)

    # Expand the cursor and construct the DataFrame
    df =  pd.DataFrame(list(cursor))

    # Delete the _id
    if no_id:
        del df['_id']

    return df

解决方案 15:

  1. 在 shell 中启动 mongo:
    mongosh

  2. 在 shell 中向上滚动,直到看到 mongo 所连接的位置。它应该看起来像这样:
    mongodb://127.0.0.1:27017/?directConnection=true&serverSelectionTimeoutMS=2000&appName=mongosh+1.5.4

  3. 复制并粘贴到 mongoclient 中

  4. 以下是代码:

from pymongo import MongoClient
import pandas as pd

client = MongoClient('mongodb://127.0.0.1:27017/?directConnection=true&serverSelectionTimeoutMS=2000&appName=mongosh+1.5.4')

mydatabase = client.yourdatabasename
mycollection = mydatabase.yourcollectionname
cursor = mycollection.find()
listofDocuments = list(cursor)
df = pd.DataFrame(listofDocuments)
df

解决方案 16:

您可以使用“pandas.json_normalize”方法:

import pandas as pd
display(pd.json_normalize( x ))
display(pd.json_normalize( x , record_path="Readings" ))

它应该显示两个表,其中 x 是您的光标或:

from bson import ObjectId
def ISODate(st):
    return st

x = {
"_cls" : "SensorReport",
"_id" : ObjectId("515a963b78f6a035d9fa531b"),
"_types" : [
    "SensorReport"
],
"Readings" : [
    {
        "a" : 0.958069536790466,
        "_types" : [
            "Reading"
        ],
        "ReadingUpdatedDate" : ISODate("2013-04-02T08:26:35.297Z"),
        "b" : 6.296118156595,
        "_cls" : "Reading"
    },
    {
        "a" : 0.95574014778624,
        "_types" : [
            "Reading"
        ],
        "ReadingUpdatedDate" : ISODate("2013-04-02T08:27:09.963Z"),
        "b" : 6.29651468650064,
        "_cls" : "Reading"
    },
    {
        "a" : 0.953648289182713,
        "_types" : [
            "Reading"
        ],
        "ReadingUpdatedDate" : ISODate("2013-04-02T08:27:37.545Z"),
        "b" : 7.29679823731148,
        "_cls" : "Reading"
    },
    {
        "a" : 0.955931884300997,
        "_types" : [
            "Reading"
        ],
        "ReadingUpdatedDate" : ISODate("2013-04-02T08:28:21.369Z"),
        "b" : 6.29642922525632,
        "_cls" : "Reading"
    },
    {
        "a" : 0.95821381,
        "_types" : [
            "Reading"
        ],
        "ReadingUpdatedDate" : ISODate("2013-04-02T08:41:20.801Z"),
        "b" : 7.28956613,
        "_cls" : "Reading"
    },
    {
        "a" : 4.95821335,
        "_types" : [
            "Reading"
        ],
        "ReadingUpdatedDate" : ISODate("2013-04-02T08:41:36.931Z"),
        "b" : 6.28956574,
        "_cls" : "Reading"
    },
    {
        "a" : 9.95821341,
        "_types" : [
            "Reading"
        ],
        "ReadingUpdatedDate" : ISODate("2013-04-02T08:42:09.971Z"),
        "b" : 0.28956488,
        "_cls" : "Reading"
    },
    {
        "a" : 1.95667927,
        "_types" : [
            "Reading"
        ],
        "ReadingUpdatedDate" : ISODate("2013-04-02T08:43:55.463Z"),
        "b" : 0.29115237,
        "_cls" : "Reading"
    }
],
"latestReportTime" : ISODate("2013-04-02T08:43:55.463Z"),
"sensorName" : "56847890-0",
"reportCount" : 8
}
相关推荐
  政府信创国产化的10大政策解读一、信创国产化的背景与意义信创国产化,即信息技术应用创新国产化,是当前中国信息技术领域的一个重要发展方向。其核心在于通过自主研发和创新,实现信息技术应用的自主可控,减少对外部技术的依赖,并规避潜在的技术制裁和风险。随着全球信息技术竞争的加剧,以及某些国家对中国在科技领域的打压,信创国产化显...
工程项目管理   2482  
  为什么项目管理通常仍然耗时且低效?您是否还在反复更新电子表格、淹没在便利贴中并参加每周更新会议?这确实是耗费时间和精力。借助软件工具的帮助,您可以一目了然地全面了解您的项目。如今,国内外有足够多优秀的项目管理软件可以帮助您掌控每个项目。什么是项目管理软件?项目管理软件是广泛行业用于项目规划、资源分配和调度的软件。它使项...
项目管理软件   1533  
  PLM(产品生命周期管理)项目对于企业优化产品研发流程、提升产品质量以及增强市场竞争力具有至关重要的意义。然而,在项目推进过程中,范围蔓延是一个常见且棘手的问题,它可能导致项目进度延迟、成本超支以及质量下降等一系列不良后果。因此,有效避免PLM项目范围蔓延成为项目成功的关键因素之一。以下将详细阐述三大管控策略,助力企业...
plm系统   0  
  PLM(产品生命周期管理)项目管理在企业产品研发与管理过程中扮演着至关重要的角色。随着市场竞争的加剧和产品复杂度的提升,PLM项目面临着诸多风险。准确量化风险优先级并采取有效措施应对,是确保项目成功的关键。五维评估矩阵作为一种有效的风险评估工具,能帮助项目管理者全面、系统地评估风险,为决策提供有力支持。五维评估矩阵概述...
免费plm软件   0  
  引言PLM(产品生命周期管理)开发流程对于企业产品的全生命周期管控至关重要。它涵盖了从产品概念设计到退役的各个阶段,直接影响着产品质量、开发周期以及企业的市场竞争力。在当今快速发展的科技环境下,客户对产品质量的要求日益提高,市场竞争也愈发激烈,这就使得优化PLM开发流程成为企业的必然选择。缺陷管理工具和六西格玛方法作为...
plm产品全生命周期管理   0  
热门文章
项目管理软件有哪些?
曾咪二维码

扫码咨询,免费领取项目管理大礼包!

云禅道AD
禅道项目管理软件

云端的项目管理软件

尊享禅道项目软件收费版功能

无需维护,随时随地协同办公

内置subversion和git源码管理

每天备份,随时转为私有部署

免费试用