有没有什么简单的方法可以对 Python 脚本进行基准测试?
- 2025-03-04 08:27:00
- admin 原创
- 85
问题描述:
通常我使用 shell 命令time
。我的目的是测试如果数据量是小、中、大或非常大,将占用多少时间和内存。
有没有什么适用于 Linux 或 Python 的工具可以做到这一点?
解决方案 1:
看看timeit、python 分析器和pycallgraph。另外,请务必查看下面提到“ SnakeViz ”的评论nikicc
。它为您提供了另一种有用的分析数据可视化。
时间
def test():
"""Stupid test function"""
lst = []
for i in range(100):
lst.append(i)
if __name__ == '__main__':
import timeit
print(timeit.timeit("test()", setup="from __main__ import test"))
# For Python>=3.5 one can also write:
print(timeit.timeit("test()", globals=locals()))
本质上,你可以将 Python 代码作为字符串参数传递给它,它将在指定的次数内运行并打印执行时间。文档中的重要部分:
timeit.timeit(stmt='pass', setup='pass', timer=<default timer>, number=1000000, globals=None)
Timer
使用给定的语句、设置
代码和 计时器函数
创建一个实例,并运行其timeit
方法并
执行多次。可选的全局参数指定要在其中执行代码的命名空间。
... 和:
Timer.timeit(number=1000000)
计算主语句的执行次数
。这将执行一次设置语句,然后返回执行主语句多次所需的时间,以秒为单位,以浮点数表示。参数是循环次数,默认为一百万。主语句、设置语句和要使用的计时器函数将传递给构造函数。注意:
默认情况下,在计时期间timeit
会暂时关闭。这种方法的优点是它使独立计时更具可比性。缺点是 GC 可能是被测函数性能的重要组成部分。如果是这样,可以将 GC 重新启用为设置字符串中的第一个语句。例如:garbage collection
timeit.Timer('for i in xrange(10): oct(i)', 'gc.enable()').timeit()
分析
分析将让你更详细地了解正在发生的事情。以下是来自官方文档的“即时示例” :
import cProfile
import re
cProfile.run('re.compile("foo|bar")')
这将为你带来:
197 function calls (192 primitive calls) in 0.002 seconds
Ordered by: standard name
ncalls tottime percall cumtime percall filename:lineno(function)
1 0.000 0.000 0.001 0.001 <string>:1(<module>)
1 0.000 0.000 0.001 0.001 re.py:212(compile)
1 0.000 0.000 0.001 0.001 re.py:268(_compile)
1 0.000 0.000 0.000 0.000 sre_compile.py:172(_compile_charset)
1 0.000 0.000 0.000 0.000 sre_compile.py:201(_optimize_charset)
4 0.000 0.000 0.000 0.000 sre_compile.py:25(_identityfunction)
3/1 0.000 0.000 0.000 0.000 sre_compile.py:33(_compile)
这两个模块都应该能让您了解在哪里寻找瓶颈。
另外,要了解的输出profile
,请查看这篇文章
pycallgraph
注意: pycallgraph自 2018 年 2 月起已正式被废弃。不过截至 2020 年 12 月,它仍在 Python 3.6 上运行。只要 Python 公开分析 API 的方式没有核心变化,它应该仍然是一个有用的工具。
该模块使用 graphviz 创建如下所示的调用图:
您可以通过颜色轻松查看哪些路径耗时最多。您可以使用 pycallgraph API 或使用打包脚本创建它们:
pycallgraph graphviz -- ./mypythonscript.py
但开销相当大。因此对于已经运行很长时间的进程,创建图表可能需要一些时间。
解决方案 2:
我使用一个简单的装饰器来计时函数
import time
def st_time(func):
"""
st decorator to calculate the total time of a func
"""
def st_func(*args, **keyArgs):
t1 = time.time()
r = func(*args, **keyArgs)
t2 = time.time()
print("Function=%s, Time=%s" % (func.__name__, t2 - t1))
return r
return st_func
解决方案 3:
这个timeit
模块很慢而且很奇怪,所以我写了这个:
def timereps(reps, func):
from time import time
start = time()
for i in range(0, reps):
func()
end = time()
return (end - start) / reps
例子:
import os
listdir_time = timereps(10000, lambda: os.listdir('/'))
print "python can do %d os.listdir('/') per second" % (1 / listdir_time)
对我来说,它说的是:
python can do 40925 os.listdir('/') per second
这是一种原始的基准测试,但已经足够好了。
解决方案 4:
我通常会快速time ./script.py
查看一下需要多长时间。不过,这不会显示内存,至少默认情况下不会显示。您可以使用它/usr/bin/time -v ./script.py
来获取很多信息,包括内存使用情况。
解决方案 5:
内存分析器可满足您所有的内存需求。
https://pypi.python.org/pypi/memory_profiler
运行 pip 安装:
pip install memory_profiler
导入库:
import memory_profiler
为您要分析的项目添加一个装饰器:
@profile
def my_func():
a = [1] * (10 ** 6)
b = [2] * (2 * 10 ** 7)
del b
return a
if __name__ == '__main__':
my_func()
执行代码:
python -m memory_profiler example.py
接收输出:
Line # Mem usage Increment Line Contents
==============================================
3 @profile
4 5.97 MB 0.00 MB def my_func():
5 13.61 MB 7.64 MB a = [1] * (10 ** 6)
6 166.20 MB 152.59 MB b = [2] * (2 * 10 ** 7)
7 13.61 MB -152.59 MB del b
8 13.61 MB 0.00 MB return a
示例来自上面链接的文档。
解决方案 6:
line_profiler(逐行执行时间)
安装
pip install line_profiler
用法
在函数前添加
@profile
装饰器。例如:
@profile
def function(base, index, shift):
addend = index << shift
result = base + addend
return result
使用命令
kernprof -l <file_name>
创建line_profiler实例。例如:
kernprof -l test.py
如果成功, kernprof 将会打印Wrote profile results to <file_name>.lprof
。例如:
Wrote profile results to test.py.lprof
使用命令
python -m line_profiler <file_name>.lprof
打印基准测试结果。例如:
python -m line_profiler test.py.lprof
您将看到有关每行代码的详细信息:
Timer unit: 1e-06 s
Total time: 0.0021632 s
File: test.py
Function: function at line 1
Line # Hits Time Per Hit % Time Line Contents
==============================================================
1 @profile
2 def function(base, index, shift):
3 1000 796.4 0.8 36.8 addend = index << shift
4 1000 745.9 0.7 34.5 result = base + addend
5 1000 620.9 0.6 28.7 return result
memory_profiler(逐行显示内存使用情况)
安装
pip install memory_profiler
用法
在函数前添加
@profile
装饰器。例如:
@profile
def function():
result = []
for i in range(10000):
result.append(i)
return result
使用命令
python -m memory_profiler <file_name>
打印基准测试结果。例如:
python -m memory_profiler test.py
您将看到有关每行代码的详细信息:
Filename: test.py
Line # Mem usage Increment Occurences Line Contents
============================================================
1 40.246 MiB 40.246 MiB 1 @profile
2 def function():
3 40.246 MiB 0.000 MiB 1 result = []
4 40.758 MiB 0.008 MiB 10001 for i in range(10000):
5 40.758 MiB 0.504 MiB 10000 result.append(i)
6 40.758 MiB 0.000 MiB 1 return result
良好实践
多次调用一个函数以尽量减少对环境的影响。
解决方案 7:
snakeviz
cProfile 的交互式查看器
https://github.com/jiffyclub/snakeviz/
https://stackoverflow.com/a/1593034/895245中提到了 cProfile ,评论中提到了 snakeviz,但我想进一步强调它。
cprofile
仅通过查看/输出来调试程序性能非常困难pstats
,因为它们只能计算出每个函数的总时间。
然而,我们真正需要的是查看包含每个调用的堆栈跟踪的嵌套视图,以便轻松找到主要瓶颈。
这正是 snakeviz 通过其默认的“冰柱”视图所提供的。
首先,你必须将 cProfile 数据转储到二进制文件中,然后才能在该文件中执行 snakeviz
pip install -u snakeviz
python -m cProfile -o results.prof myscript.py
snakeviz results.prof
这将打印一个 URL 到 stdout,您可以在浏览器上打开它,其中包含所需的输出,如下所示:
然后您就可以:
将鼠标悬停在每个框上以查看包含该函数的文件的完整路径
单击一个框可使该框显示在顶部,以便放大
更多面向配置文件的问题:如何分析 Python 脚本?
解决方案 8:
如果您不想为 timeit 编写样板代码并希望获得易于分析的结果,请查看benchmarkit。它还保存了以前运行的历史记录,因此在开发过程中很容易比较相同的功能。
# pip install benchmarkit
from benchmarkit import benchmark, benchmark_run
N = 10000
seq_list = list(range(N))
seq_set = set(range(N))
SAVE_PATH = '/tmp/benchmark_time.jsonl'
@benchmark(num_iters=100, save_params=True)
def search_in_list(num_items=N):
return num_items - 1 in seq_list
@benchmark(num_iters=100, save_params=True)
def search_in_set(num_items=N):
return num_items - 1 in seq_set
benchmark_results = benchmark_run(
[search_in_list, search_in_set],
SAVE_PATH,
comment='initial benchmark search',
)
打印到终端并返回包含上次运行数据的字典列表。命令行入口点也可用。
如果您更改N=1000000
并重新运行
解决方案 9:
有多种方法可以对 Python 脚本进行基准测试。其中一种简单的方法是使用timeit模块,它提供了一种简单的方法来测量小代码片段的执行时间。但是,如果您正在寻找包含内存使用情况的更全面的基准测试,则可以使用memory_profiler包来测量内存使用情况。
要可视化基准测试,您可以使用plotly库,它允许您创建交互式图表。您可以创建折线图来显示不同输入大小的执行时间和内存使用情况。
下面是一个示例代码片段,用于对以矩阵、行和列作为输入的函数的两种不同实现进行基准测试:
import timeit
import random
import numpy as np
from plotly.subplots import make_subplots
import plotly.graph_objects as go
from memory_profiler import memory_usage
from memory_profiler import profile
from my.package.module import real_func_1, real_func_2
@profile
def func_impl_1(matrix, row, column):
return real_func_1(matrix, row, column)
@profile
def func_impl_2(matrix, row, column):
return real_func_2(matrix, row, column)
# Analysis range
x = list(range(3, 100))
# Time results
y1 = []
y2 = []
# Memory results
m1 = []
m2 = []
for i in x:
# Random choice of parameters
A = np.random.rand(i, i)
rx = random.randint(0, i-1)
ry = random.randint(0, i-1)
t1 = 0
t2 = 0
m1_ = 0
m2_ = 0
for _ in range(10):
t1 += timeit.timeit(
lambda: func_impl_1(A, rx, ry),
number=1,
)
t2 += timeit.timeit(
lambda: func_impl_2(A, rx, ry),
number=1,
)
m1_ += max(memory_usage(
(lambda: func_impl_1(A, rx, ry),)
))
m2_ += max(memory_usage(
(lambda: func_impl_2(A, rx, ry),)
))
y1.append(t1/100)
y2.append(t2/100)
m1.append(m1_/100)
m2.append(m2_/100)
# Title of first graph:
fig = make_subplots(rows=2, cols=1, shared_xaxes=True, subplot_titles=("Time", "Memory"))
fig.add_trace(go.Scatter(x=x, y=y1, name='func_impl_1 time', legendgroup='1'), row=1, col=1)
fig.add_trace(go.Scatter(x=x, y=y2, name='func_impl_2 time', legendgroup='1'), row=1, col=1)
fig.add_trace(go.Scatter(x=x, y=m1, name='func_impl_1 memory', legendgroup='2'), row=2, col=1)
fig.add_trace(go.Scatter(x=x, y=m2, name='func_impl_2 memory', legendgroup='2'), row=2, col=1)
fig.update_layout(
title="Performance of the functions",
xaxis_title="Matrix size",
)
fig.update_yaxes(title_text="Time (s)", row=1, col=1)
fig.update_yaxes(title_text="Max Memory usage (MB)", row=2, col=1)
fig.show()
图表:
从图表来看,这两个函数的内存使用情况似乎相似,这一点值得注意。就运行时间而言,func_impl_2 似乎通常比 func_impl_1 更快,这也是一个积极的发现。但是,这两个函数之间的性能差异非常小,并且对于非常小的输入大小,func_impl_1 的性能在某个点上超过了 func_impl_2。这可能表明,即使 func_impl_2 通常更快,但更简单的 func_impl_1 实现对于较小的输入仍然是可行的选择。总体而言,这些图表提供了有关这些函数性能的宝贵见解,并有助于在不同场景中选择使用哪种实现时做出决策。
解决方案 10:
看一下nose和它的一个插件,特别是这个。
一旦安装,nose 就是您路径中的一个脚本,您可以在包含一些 python 脚本的目录中调用它:
$: nosetests
这将查找当前目录中的所有 python 文件,并执行它识别为测试的任何函数:例如,它将名称中带有单词 test_ 的任何函数识别为测试。
因此,您只需创建一个名为 test_yourfunction.py 的 Python 脚本并在其中写入如下内容:
$: cat > test_yourfunction.py
def test_smallinput():
yourfunction(smallinput)
def test_mediuminput():
yourfunction(mediuminput)
def test_largeinput():
yourfunction(largeinput)
然后你必须跑
$: nosetest --with-profile --profile-stats-file yourstatsprofile.prof testyourfunction.py
要读取配置文件,请使用以下 python 行:
python -c "import hotshot.stats ; stats = hotshot.stats.load('yourstatsprofile.prof') ; stats.sort_stats('time', 'calls') ; stats.print_stats(200)"
解决方案 11:
快速测试任何函数的简单方法是使用以下语法:%timeit my_code
例如 :
%timeit a = 1
13.4 ns ± 0.781 ns per loop (mean ± std. dev. of 7 runs, 100000000 loops each)
解决方案 12:
小心timeit
非常慢,在我的中等处理器上需要 12 秒才能初始化(或者运行该功能)。你可以测试这个接受的答案
def test():
lst = []
for i in range(100):
lst.append(i)
if __name__ == '__main__':
import timeit
print(timeit.timeit("test()", setup="from __main__ import test")) # 12 second
对于简单的事情,我将使用time
它,在我的电脑上它返回结果0.0
import time
def test():
lst = []
for i in range(100):
lst.append(i)
t1 = time.time()
test()
result = time.time() - t1
print(result) # 0.000000xxxx
解决方案 13:
根据 Danyun Liu 的回答,它有一些便捷的功能,也许对某些人有用。
def stopwatch(repeat=1, autorun=True):
"""
stopwatch decorator to calculate the total time of a function
"""
import timeit
import functools
def outer_func(func):
@functools.wraps(func)
def time_func(*args, **kwargs):
t1 = timeit.default_timer()
for _ in range(repeat):
r = func(*args, **kwargs)
t2 = timeit.default_timer()
print(f"Function={func.__name__}, Time={t2 - t1}")
return r
if autorun:
try:
time_func()
except TypeError:
raise Exception(f"{time_func.__name__}: autorun only works with no parameters, you may want to use @stopwatch(autorun=False)") from None
return time_func
if callable(repeat):
func = repeat
repeat = 1
return outer_func(func)
return outer_func
一些测试:
def is_in_set(x):
return x in {"linux", "darwin"}
def is_in_list(x):
return x in ["linux", "darwin"]
@stopwatch
def run_once():
import time
time.sleep(0.5)
@stopwatch(autorun=False)
def run_manually():
import time
time.sleep(0.5)
run_manually()
@stopwatch(repeat=10000000)
def repeat_set():
is_in_set("windows")
is_in_set("darwin")
@stopwatch(repeat=10000000)
def repeat_list():
is_in_list("windows")
is_in_list("darwin")
@stopwatch
def should_fail(x):
pass
结果:
Function=run_once, Time=0.5005391679987952
Function=run_manually, Time=0.500624185999186
Function=repeat_set, Time=1.7064883739985817
Function=repeat_list, Time=1.8905151920007484
Traceback (most recent call last):
(some more traceback here...)
Exception: should_fail: autorun only works with no parameters, you may want to use @stopwatch(autorun=False)
解决方案 14:
我编写了一个工具来对给定的函数进行并发压力测试,输出类似于 Apache AB。也许这就是你想要的:
import itertools
import threading
import time
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
def create_counter():
"""
Atomic counter
"""
return itertools.count()
def get_and_increase(counter):
return next(counter)
class Context:
def __init__(self, num_threads, target_fun):
self.failed_counter = create_counter()
self.start_barrier = threading.Barrier(num_threads)
self.target_fun = target_fun
class Worker:
def __init__(self, context, num_per_thread):
self.context = context
self.num_per_thread = num_per_thread
self.time_takes_arr = []
def run(self):
target_fun = self.context.target_fun
start_barrier = self.context.start_barrier
failed_counter = self.context.failed_counter
time_takes_arr = self.time_takes_arr
start_barrier.wait()
for i in range(self.num_per_thread):
start = time.time_ns()
try:
target_fun()
except Exception as err:
get_and_increase(failed_counter)
finally:
time_takes_arr.append(time.time_ns() - start)
def test(num, num_threads, target_fun, num_warm_up=0):
if num_warm_up > 0:
for i in range(num_warm_up):
target_fun()
executor = ThreadPoolExecutor(max_workers=num_threads)
num_per_thread = num // num_threads
context = Context(num_threads, target_fun)
workers = [Worker(context, num_per_thread) for _ in range(num_threads)]
for worker in workers:
executor.submit(lambda: worker.run())
executor.shutdown(wait=True)
act_num = num_per_thread * num_threads
failed_num = get_and_increase(context.failed_counter)
time_takes_all = []
time_takes_thread = []
for worker in workers:
time_takes_arr = worker.time_takes_arr
time_takes_thread.append(sum(time_takes_arr))
time_takes_all.extend(time_takes_arr)
time_takes_all.sort()
time_takes_thread.sort()
total_time_takes = time_takes_thread[-1]
return num_threads, total_time_takes, act_num, failed_num, time_takes_all
def format(test_name, num_threads, total_time_takes, act_num, failed_num, time_takes_all):
idx50 = act_num // 2
idx66 = act_num * 66 // 100
idx75 = act_num * 75 // 100
idx80 = act_num * 80 // 100
idx90 = act_num * 90 // 100
idx95 = act_num * 95 // 100
idx98 = act_num * 98 // 100
idx99 = act_num * 99 // 100
time_sum = sum(time_takes_all)
tps = 1000_000_000 * num_threads * (act_num / time_sum)
time_takes_avg = time_sum / len(time_takes_all) / 1000_000
return f"""{test_name} test result:
Concurrency Level: {num_threads:d}
Time taken for tests: {total_time_takes / 1000000:.6f} ms
Complete Tasks: {act_num:,}
Failed Tasks: {failed_num:,}
Tasks per second: {tps:,.2f}
Time per task: {time_takes_avg:.9f} ms
Time per task: {time_takes_avg / num_threads} ms (across all concurrent tasks)
Shortest task: {time_takes_all[0] / 1000000:.9f} ms
Percentage of the tasks served within a certain time (ms)
50% {time_takes_all[idx50] / 1000000:.6f}
66% {time_takes_all[idx66] / 1000000:.6f}
75% {time_takes_all[idx75] / 1000000:.6f}
80% {time_takes_all[idx80] / 1000000:.6f}
90% {time_takes_all[idx90] / 1000000:.6f}
95% {time_takes_all[idx95] / 1000000:.6f}
98% {time_takes_all[idx98] / 1000000:.6f}
99% {time_takes_all[idx99] / 1000000:.6f}
100% {time_takes_all[-1] / 1000000:.6f} (longest task)"""
def test_and_print(test_name, num, num_threads, target_fun, num_warm_up=10):
num_threads, total_time_takes, act_num, failed_num, time_takes_all = test(num, num_threads, target_fun, num_warm_up)
print(format(test_name, num_threads, total_time_takes, act_num, failed_num, time_takes_all))
if __name__ == "__main__":
c1 = create_counter()
def task():
get_and_increase(c1)
test_and_print("My test",
7000000, 50
, task
, 20)
print()
print(f"c1:{get_and_increase(c1)}")
输出结果为:
My test test result:
Concurrency Level: 50
Time taken for tests: 1544.781000 ms
Complete Tasks: 7,000,000
Failed Tasks: 0
Tasks per second: 10,133,790.35
Time per task: 0.004933988 ms
Time per task: 9.867976000000001e-05 ms (across all concurrent tasks)
Shortest task: -0.003000000 ms
Percentage of the tasks served within a certain time (ms)
50% 0.000000
66% 0.000000
75% 0.000000
80% 0.000000
90% 0.001000
95% 0.001000
98% 0.001000
99% 0.001000
100% 1120.871000 (longest task)
扫码咨询,免费领取项目管理大礼包!