SQL Alchemy ORM 返回单个列,如何避免常见的后期处理
- 2025-03-14 08:57:00
- admin 原创
- 73
问题描述:
我正在使用 SQL Alchemy 的 ORM,我发现当我返回单个列时,我得到的结果如下:
[(result,), (result_2,)] # etc...
有了这样的一套,我发现我必须经常这样做:
results = [r[0] for r in results] # So that I just have a list of result values
这并不是那么“糟糕”,因为我的结果集通常很小,但如果结果集不大,这可能会增加大量开销。最大的问题是,我觉得它使源代码变得混乱,而错过这一步是我遇到的一个相当常见的错误。
有什么方法可以避免这个额外的步骤吗?
相关的题外话:在这种情况下,orm 的这种行为似乎不方便,但在另一种情况下,我的结果集是 [(id, value)],最终结果是这样的:
[(result_1_id, result_1_val), (result_2_id, result_2_val)]
然后我可以这样做:
results = dict(results) # so I have a map of id to value
这样做的好处是,在返回结果后,这可以作为一个有用的步骤。
这真的是个问题吗,还是我只是在吹毛求疵,在获得结果集之后进行后处理对这两种情况都有意义?我相信我们可以想出一些其他常见的后处理操作,使结果集在应用程序代码中更易于使用。是否有全面的高性能和便捷解决方案,或者后处理是不可避免的,只是不同的应用程序用途需要?
当我的应用程序能够真正利用 SQL Alchemy 的 ORM 返回的对象时,它似乎非常有用,但在我不能或不想使用的情况下,就没那么有用了。这是否只是 ORM 的一个常见问题?在这种情况下,我最好不要使用 ORM 层?
我想我应该展示一个我正在谈论的实际 orm 查询的例子:
session.query(OrmObj.column_name).all()
或者
session.query(OrmObj.id_column_name, OrmObj.value_column_name).all()
当然,在实际查询中通常会有一些过滤器等等。
解决方案 1:
减少源中的混乱的一种方法是进行如下迭代:
results = [r for (r, ) in results]
尽管此解决方案比使用[]
运算符长一个字符,但我认为它更容易被眼睛接受。
为了减少混乱,请删除括号。但这会使阅读代码时更难注意到您实际上正在处理元组:
results = [r for r, in results]
解决方案 2:
Python 的 zip 与 * 内联扩展运算符相结合可以非常方便地解决此问题:
>>> results = [('result',), ('result_2',), ('result_3',)]
>>> zip(*results)
[('result', 'result_2', 'result_3')]
然后你只需要 [0] 索引一次。对于如此短的列表,你的理解会更快:
>>> timeit('result = zip(*[("result",), ("result_2",), ("result_3",)])', number=10000)
0.010490894317626953
>>> timeit('result = [ result[0] for result in [("result",), ("result_2",), ("result_3",)] ]', number=10000)
0.0028390884399414062
然而对于更长的列表,zip 应该更快:
>>> timeit('result = zip(*[(1,)]*100)', number=10000)
0.049577951431274414
>>> timeit('result = [ result[0] for result in [(1,)]*100 ]', number=10000)
0.11178708076477051
因此,由您来决定哪种方法更适合您的情况。
解决方案 3:
从 1.4 版本开始,SQLAlchemy 提供了一种以值列表形式检索单个列的结果的方法:
# ORM
>>> session.scalars(select(User.name)).all()
['ed', 'wendy', 'mary', 'fred']
# or
>>> query = session.query(User.name)
>>> session.scalars(query).all()
['ed', 'wendy', 'mary', 'fred']
# Core
>>> with engine.connect() as connection:
... result = connection.execute(text("select name from users"))
... result.scalars().all()
...
['ed', 'wendy', 'mary', 'fred']
参见SQLAlchemy 文档。
解决方案 4:
我也对此苦苦挣扎,直到我意识到它就像任何其他查询一样:
for result in results:
print result.column_name
解决方案 5:
我发现以下内容更具可读性,还包括字典的答案(在 Python 2.7 中):
d = {id_: name for id_, name in session.query(Customer.id, Customer.name).all()}
l = [r.id for r in session.query(Customer).all()]
对于单一价值,借用另一个答案:
l = [name for (name, ) in session.query(Customer.name).all()]
与内置zip
解决方案进行比较,适应列表:
l = list(zip(*session.query(Customer.id).all())[0])
在我看来,它只能提供大约 4% 的速度提升。
解决方案 6:
我的解决方案如下:)
def column(self):
for column, *_ in Model.query.with_entities(Model.column).all():
yield column
注意:仅限 py3。
解决方案 7:
哇,伙计们,为什么要紧张?有更陡峭的方法,更快,更优雅)
>>> results = [('result',), ('result_2',), ('result_3',)]
>>> sum(results, tuple())
('result', 'result_2', 'result_3')
速度:
>>> timeit('result = zip(*[("result",), ("result_2",), ("result_3",)])', number=10000)
0.004222994000883773
>>> timeit('result = sum([("result",), ("result_2",), ("result_3",)], ())', number=10000)
0.0038205889868550003
但如果列表中有更多元素- 仅使用zip。 Zip 速度更快。
扫码咨询,免费领取项目管理大礼包!