有没有什么简单的方法可以对 Python 脚本进行基准测试?

2025-03-04 08:27:00
admin
原创
86
摘要:问题描述:通常我使用 shell 命令time。我的目的是测试如果数据量是小、中、大或非常大,将占用多少时间和内存。有没有什么适用于 Linux 或 Python 的工具可以做到这一点?解决方案 1:看看timeit、python 分析器和pycallgraph。另外,请务必查看下面提到“ SnakeViz ...

问题描述:

通常我使用 shell 命令time。我的目的是测试如果数据量是小、中、大或非常大,将占用多少时间和内存。

有没有什么适用于 Linux 或 Python 的工具可以做到这一点?


解决方案 1:

看看timeit、python 分析器和pycallgraph。另外,请务必查看下面提到“ SnakeViz ”的评论nikicc。它为您提供了另一种有用的分析数据可视化。

时间

def test():
    """Stupid test function"""
    lst = []
    for i in range(100):
        lst.append(i)

if __name__ == '__main__':
    import timeit
    print(timeit.timeit("test()", setup="from __main__ import test"))

    # For Python>=3.5 one can also write:
    print(timeit.timeit("test()", globals=locals()))

本质上,你可以将 Python 代码作为字符串参数传递给它,它将在指定的次数内运行并打印执行时间。文档中的重要部分:

timeit.timeit(stmt='pass', setup='pass', timer=<default timer>, number=1000000, globals=None)Timer使用给定的语句、设置
代码和 计时器函数
创建一个实例,并运行其timeit方法并
执行多次。可选的全局参数指定要在其中执行代码的命名空间。

... 和:

Timer.timeit(number=1000000)计算主语句的执行次数
。这将执行一次设置语句,然后返回执行主语句多次所需的时间,以秒为单位,以浮点数表示。参数是循环次数,默认为一百万。主语句、设置语句和要使用的计时器函数将传递给构造函数。

注意:
默认情况下,在计时期间timeit会暂时关闭。这种方法的优点是它使独立计时更具可比性。缺点是 GC 可能是被测函数性能的重要组成部分。如果是这样,可以将 GC 重新启用为设置字符串中的第一个语句。例如:garbage collection

timeit.Timer('for i in xrange(10): oct(i)', 'gc.enable()').timeit()

分析

分析将让你更详细地了解正在发生的事情。以下是来自官方文档的“即时示例” :

import cProfile
import re
cProfile.run('re.compile("foo|bar")')

这将为你带来:

      197 function calls (192 primitive calls) in 0.002 seconds

Ordered by: standard name

ncalls  tottime  percall  cumtime  percall filename:lineno(function)
     1    0.000    0.000    0.001    0.001 <string>:1(<module>)
     1    0.000    0.000    0.001    0.001 re.py:212(compile)
     1    0.000    0.000    0.001    0.001 re.py:268(_compile)
     1    0.000    0.000    0.000    0.000 sre_compile.py:172(_compile_charset)
     1    0.000    0.000    0.000    0.000 sre_compile.py:201(_optimize_charset)
     4    0.000    0.000    0.000    0.000 sre_compile.py:25(_identityfunction)
   3/1    0.000    0.000    0.000    0.000 sre_compile.py:33(_compile)

这两个模块都应该能让您了解在哪里寻找瓶颈。

另外,要了解的输出profile,请查看这篇文章

pycallgraph

注意: pycallgraph自 2018 年 2 月起已正式被废弃。不过截至 2020 年 12 月,它仍在 Python 3.6 上运行。只要 Python 公开分析 API 的方式没有核心变化,它应该仍然是一个有用的工具。

该模块使用 graphviz 创建如下所示的调用图:

调用图示例

您可以通过颜色轻松查看哪些路径耗时最多。您可以使用 pycallgraph API 或使用打包脚本创建它们:

pycallgraph graphviz -- ./mypythonscript.py

但开销相当大。因此对于已经运行很长时间的进程,创建图表可能需要一些时间。

解决方案 2:

我使用一个简单的装饰器来计时函数

import time

def st_time(func):
    """
        st decorator to calculate the total time of a func
    """

    def st_func(*args, **keyArgs):
        t1 = time.time()
        r = func(*args, **keyArgs)
        t2 = time.time()
        print("Function=%s, Time=%s" % (func.__name__, t2 - t1))
        return r

    return st_func

解决方案 3:

这个timeit模块很慢而且很奇怪,所以我写了这个:

def timereps(reps, func):
    from time import time
    start = time()
    for i in range(0, reps):
        func()
    end = time()
    return (end - start) / reps

例子:

import os
listdir_time = timereps(10000, lambda: os.listdir('/'))
print "python can do %d os.listdir('/') per second" % (1 / listdir_time)

对我来说,它说的是:

python can do 40925 os.listdir('/') per second

这是一种原始的基准测试,但已经足够好了。

解决方案 4:

我通常会快速time ./script.py查看一下需要多长时间。不过,这不会显示内存,至少默认情况下不会显示。您可以使用它/usr/bin/time -v ./script.py来获取很多信息,包括内存使用情况。

解决方案 5:

内存分析器可满足您所有的内存需求。

https://pypi.python.org/pypi/memory_profiler

运行 pip 安装:

pip install memory_profiler

导入库:

import memory_profiler

为您要分析的项目添加一个装饰器:

@profile
def my_func():
    a = [1] * (10 ** 6)
    b = [2] * (2 * 10 ** 7)
    del b
    return a

if __name__ == '__main__':
    my_func()

执行代码:

python -m memory_profiler example.py

接收输出:

 Line #    Mem usage  Increment   Line Contents
 ==============================================
 3                           @profile
 4      5.97 MB    0.00 MB   def my_func():
 5     13.61 MB    7.64 MB       a = [1] * (10 ** 6)
 6    166.20 MB  152.59 MB       b = [2] * (2 * 10 ** 7)
 7     13.61 MB -152.59 MB       del b
 8     13.61 MB    0.00 MB       return a

示例来自上面链接的文档。

解决方案 6:

line_profiler(逐行执行时间)

安装

pip install line_profiler

用法

  • 在函数前添加@profile装饰器。例如:

@profile
def function(base, index, shift):
    addend = index << shift
    result = base + addend
    return result
  • 使用命令kernprof -l <file_name>创建line_profiler实例。例如:

kernprof -l test.py

如果成功, kernprof 将会打印Wrote profile results to <file_name>.lprof。例如:

Wrote profile results to test.py.lprof
  • 使用命令python -m line_profiler <file_name>.lprof打印基准测试结果。例如:

python -m line_profiler test.py.lprof

您将看到有关每行代码的详细信息:

Timer unit: 1e-06 s

Total time: 0.0021632 s
File: test.py
Function: function at line 1

Line #      Hits         Time  Per Hit   % Time  Line Contents
==============================================================
     1                                           @profile
     2                                           def function(base, index, shift):
     3      1000        796.4      0.8     36.8      addend = index << shift
     4      1000        745.9      0.7     34.5      result = base + addend
     5      1000        620.9      0.6     28.7      return result

memory_profiler(逐行显示内存使用情况)

安装

pip install memory_profiler

用法

  • 在函数前添加@profile装饰器。例如:

@profile
def function():
    result = []
    for i in range(10000):
        result.append(i)
    return result
  • 使用命令python -m memory_profiler <file_name>打印基准测试结果。例如:

python -m memory_profiler test.py

您将看到有关每行代码的详细信息:

Filename: test.py

Line #    Mem usage    Increment  Occurences   Line Contents
============================================================
     1   40.246 MiB   40.246 MiB           1   @profile
     2                                         def function():
     3   40.246 MiB    0.000 MiB           1       result = []
     4   40.758 MiB    0.008 MiB       10001       for i in range(10000):
     5   40.758 MiB    0.504 MiB       10000           result.append(i)
     6   40.758 MiB    0.000 MiB           1       return result

良好实践

多次调用一个函数以尽量减少对环境的影响。

解决方案 7:

snakevizcProfile 的交互式查看器

https://github.com/jiffyclub/snakeviz/

https://stackoverflow.com/a/1593034/895245中提到了 cProfile ,评论中提到了 snakeviz,但我想进一步强调它。

cprofile仅通过查看/输出来调试程序性能非常困难pstats,因为它们只能计算出每个函数的总时间。

然而,我们真正需要的是查看包含每个调用的堆栈跟踪的嵌套视图,以便轻松找到主要瓶颈。

这正是 snakeviz 通过其默认的“冰柱”视图所提供的。

首先,你必须将 cProfile 数据转储到二进制文件中,然后才能在该文件中执行 snakeviz

pip install -u snakeviz
python -m cProfile -o results.prof myscript.py
snakeviz results.prof

这将打印一个 URL 到 stdout,您可以在浏览器上打开它,其中包含所需的输出,如下所示:

在此处输入图片描述

然后您就可以:

  • 将鼠标悬停在每个框上以查看包含该函数的文件的完整路径

  • 单击一个框可使该框显示在顶部,以便放大

更多面向配置文件的问题:如何分析 Python 脚本?

解决方案 8:

如果您不想为 timeit 编写样板代码并希望获得易于分析的结果,请查看benchmarkit。它还保存了以前运行的历史记录,因此在开发过程中很容易比较相同的功能。

# pip install benchmarkit

from benchmarkit import benchmark, benchmark_run

N = 10000
seq_list = list(range(N))
seq_set = set(range(N))

SAVE_PATH = '/tmp/benchmark_time.jsonl'

@benchmark(num_iters=100, save_params=True)
def search_in_list(num_items=N):
    return num_items - 1 in seq_list

@benchmark(num_iters=100, save_params=True)
def search_in_set(num_items=N):
    return num_items - 1 in seq_set

benchmark_results = benchmark_run(
   [search_in_list, search_in_set],
   SAVE_PATH,
   comment='initial benchmark search',
)  

打印到终端并返回包含上次运行数据的字典列表。命令行入口点也可用。

在此处输入图片描述

如果您更改N=1000000并重新运行

在此处输入图片描述

解决方案 9:

有多种方法可以对 Python 脚本进行基准测试。其中一种简单的方法是使用timeit模块,它提供了一种简单的方法来测量小代码片段的执行时间。但是,如果您正在寻找包含内存使用情况的更全面的基准测试,则可以使用memory_profiler包来测量内存使用情况。

要可视化基准测试,您可以使用plotly库,它允许您创建交互式图表。您可以创建折线图来显示不同输入大小的执行时间和内存使用情况。

下面是一个示例代码片段,用于对以矩阵、行和列作为输入的函数的两种不同实现进行基准测试:

import timeit
import random
import numpy as np

from plotly.subplots import make_subplots
import plotly.graph_objects as go


from memory_profiler import memory_usage
from memory_profiler import profile

from my.package.module import real_func_1, real_func_2

@profile
def func_impl_1(matrix, row, column):
    return real_func_1(matrix, row, column)

@profile
def func_impl_2(matrix, row, column):
    return real_func_2(matrix, row, column)


# Analysis range
x = list(range(3, 100))

# Time results
y1 = []
y2 = []

# Memory results
m1 = []
m2 = []


for i in x:
    # Random choice of parameters
    A = np.random.rand(i, i)
    rx = random.randint(0, i-1)
    ry = random.randint(0, i-1)

    t1 = 0
    t2 = 0

    m1_ = 0
    m2_ = 0

    for _ in range(10):
        t1 += timeit.timeit(
            lambda: func_impl_1(A, rx, ry),
            number=1,
        )

        t2 += timeit.timeit(
            lambda: func_impl_2(A, rx, ry),
            number=1,
        )

        m1_ += max(memory_usage(
            (lambda: func_impl_1(A, rx, ry),)
        ))

        m2_ += max(memory_usage(
            (lambda: func_impl_2(A, rx, ry),)
        ))


    y1.append(t1/100)
    y2.append(t2/100)

    m1.append(m1_/100)
    m2.append(m2_/100)

# Title of first graph:

fig = make_subplots(rows=2, cols=1, shared_xaxes=True, subplot_titles=("Time", "Memory"))

fig.add_trace(go.Scatter(x=x, y=y1, name='func_impl_1 time', legendgroup='1'), row=1, col=1)
fig.add_trace(go.Scatter(x=x, y=y2, name='func_impl_2 time', legendgroup='1'), row=1, col=1)

fig.add_trace(go.Scatter(x=x, y=m1, name='func_impl_1 memory', legendgroup='2'), row=2, col=1)
fig.add_trace(go.Scatter(x=x, y=m2, name='func_impl_2 memory', legendgroup='2'), row=2, col=1)


fig.update_layout(
    title="Performance of the functions",
    xaxis_title="Matrix size",
)

fig.update_yaxes(title_text="Time (s)", row=1, col=1)
fig.update_yaxes(title_text="Max Memory usage (MB)", row=2, col=1)

fig.show()

图表:
带有时间和内存基准的图表

从图表来看,这两个函数的内存使用情况似乎相似,这一点值得注意。就运行时间而言,func_impl_2 似乎通常比 func_impl_1 更快,这也是一个积极的发现。但是,这两个函数之间的性能差异非常小,并且对于非常小的输入大小,func_impl_1 的性能在某个点上超过了 func_impl_2。这可能表明,即使 func_impl_2 通常更快,但更简单的 func_impl_1 实现对于较小的输入仍然是可行的选择。总体而言,这些图表提供了有关这些函数性能的宝贵见解,并有助于在不同场景中选择使用哪种实现时做出决策。

解决方案 10:

看一下nose和它的一个插件,特别是这个。

一旦安装,nose 就是您路径中的一个脚本,您可以在包含一些 python 脚本的目录中调用它:

$: nosetests

这将查找当前目录中的所有 python 文件,并执行它识别为测试的任何函数:例如,它将名称中带有单词 test_ 的任何函数识别为测试。

因此,您只需创建一个名为 test_yourfunction.py 的 Python 脚本并在其中写入如下内容:

$: cat > test_yourfunction.py

def test_smallinput():
    yourfunction(smallinput)

def test_mediuminput():
    yourfunction(mediuminput)

def test_largeinput():
    yourfunction(largeinput)

然后你必须跑

$: nosetest --with-profile --profile-stats-file yourstatsprofile.prof testyourfunction.py

要读取配置文件,请使用以下 python 行:

python -c "import hotshot.stats ; stats = hotshot.stats.load('yourstatsprofile.prof') ; stats.sort_stats('time', 'calls') ; stats.print_stats(200)"

解决方案 11:

快速测试任何函数的简单方法是使用以下语法:
%timeit my_code

例如 :

%timeit a = 1

13.4 ns ± 0.781 ns per loop (mean ± std. dev. of 7 runs, 100000000 loops each)

解决方案 12:

小心timeit非常慢,在我的中等处理器上需要 12 秒才能初始化(或者运行该功能)。你可以测试这个接受的答案

def test():
    lst = []
    for i in range(100):
        lst.append(i)

if __name__ == '__main__':
    import timeit
    print(timeit.timeit("test()", setup="from __main__ import test")) # 12 second

对于简单的事情,我将使用time它,在我的电脑上它返回结果0.0

import time

def test():
    lst = []
    for i in range(100):
        lst.append(i)

t1 = time.time()

test()

result = time.time() - t1
print(result) # 0.000000xxxx

解决方案 13:

根据 Danyun Liu 的回答,它有一些便捷的功能,也许对某些人有用。

def stopwatch(repeat=1, autorun=True):
    """
    stopwatch decorator to calculate the total time of a function
    """
    import timeit
    import functools
    
    def outer_func(func):
        @functools.wraps(func)
        def time_func(*args, **kwargs):
            t1 = timeit.default_timer()
            for _ in range(repeat):
                r = func(*args, **kwargs)
            t2 = timeit.default_timer()
            print(f"Function={func.__name__}, Time={t2 - t1}")
            return r
        
        if autorun:
            try:
                time_func()
            except TypeError:
                raise Exception(f"{time_func.__name__}: autorun only works with no parameters, you may want to use @stopwatch(autorun=False)") from None
        
        return time_func
    
    if callable(repeat):
        func = repeat
        repeat = 1
        return outer_func(func)
    
    return outer_func

一些测试:

def is_in_set(x):
    return x in {"linux", "darwin"}

def is_in_list(x):
    return x in ["linux", "darwin"]

@stopwatch
def run_once():
    import time
    time.sleep(0.5)

@stopwatch(autorun=False)
def run_manually():
    import time
    time.sleep(0.5)

run_manually()

@stopwatch(repeat=10000000)
def repeat_set():
    is_in_set("windows")
    is_in_set("darwin")

@stopwatch(repeat=10000000)
def repeat_list():
    is_in_list("windows")
    is_in_list("darwin")

@stopwatch
def should_fail(x):
    pass

结果:

Function=run_once, Time=0.5005391679987952
Function=run_manually, Time=0.500624185999186
Function=repeat_set, Time=1.7064883739985817
Function=repeat_list, Time=1.8905151920007484
Traceback (most recent call last):
  (some more traceback here...)
Exception: should_fail: autorun only works with no parameters, you may want to use @stopwatch(autorun=False)

解决方案 14:

我编写了一个工具来对给定的函数进行并发压力测试,输出类似于 Apache AB。也许这就是你想要的:

import itertools
import threading
import time
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass


def create_counter():
    """
    Atomic counter
    """
    return itertools.count()


def get_and_increase(counter):
    return next(counter)


class Context:
    def __init__(self, num_threads, target_fun):
        self.failed_counter = create_counter()
        self.start_barrier = threading.Barrier(num_threads)
        self.target_fun = target_fun


class Worker:
    def __init__(self, context, num_per_thread):
        self.context = context
        self.num_per_thread = num_per_thread
        self.time_takes_arr = []

    def run(self):
        target_fun = self.context.target_fun
        start_barrier = self.context.start_barrier
        failed_counter = self.context.failed_counter
        time_takes_arr = self.time_takes_arr
        start_barrier.wait()
        for i in range(self.num_per_thread):
            start = time.time_ns()
            try:
                target_fun()
            except Exception as err:
                get_and_increase(failed_counter)
            finally:
                time_takes_arr.append(time.time_ns() - start)


def test(num, num_threads, target_fun, num_warm_up=0):
    if num_warm_up > 0:
        for i in range(num_warm_up):
            target_fun()

    executor = ThreadPoolExecutor(max_workers=num_threads)

    num_per_thread = num // num_threads
    context = Context(num_threads, target_fun)
    workers = [Worker(context, num_per_thread) for _ in range(num_threads)]

    for worker in workers:
        executor.submit(lambda: worker.run())
    executor.shutdown(wait=True)

    act_num = num_per_thread * num_threads
    failed_num = get_and_increase(context.failed_counter)

    time_takes_all = []
    time_takes_thread = []
    for worker in workers:
        time_takes_arr = worker.time_takes_arr
        time_takes_thread.append(sum(time_takes_arr))
        time_takes_all.extend(time_takes_arr)
    time_takes_all.sort()
    time_takes_thread.sort()

    total_time_takes = time_takes_thread[-1]

    return num_threads, total_time_takes, act_num, failed_num, time_takes_all


def format(test_name, num_threads, total_time_takes, act_num, failed_num, time_takes_all):
    idx50 = act_num // 2
    idx66 = act_num * 66 // 100
    idx75 = act_num * 75 // 100
    idx80 = act_num * 80 // 100
    idx90 = act_num * 90 // 100
    idx95 = act_num * 95 // 100
    idx98 = act_num * 98 // 100
    idx99 = act_num * 99 // 100

    time_sum = sum(time_takes_all)
    tps = 1000_000_000 * num_threads * (act_num / time_sum)
    time_takes_avg = time_sum / len(time_takes_all) / 1000_000

    return f"""{test_name} test result:
 Concurrency Level: {num_threads:d}
 Time taken for tests:  {total_time_takes / 1000000:.6f} ms
 Complete Tasks:    {act_num:,}
 Failed Tasks:      {failed_num:,}
 Tasks per second:  {tps:,.2f}
 Time per task:     {time_takes_avg:.9f} ms
 Time per task:     {time_takes_avg / num_threads} ms (across all concurrent tasks)
 Shortest task:     {time_takes_all[0] / 1000000:.9f} ms
 Percentage of the tasks served within a certain time (ms)
  50%   {time_takes_all[idx50] / 1000000:.6f}
  66%   {time_takes_all[idx66] / 1000000:.6f}
  75%   {time_takes_all[idx75] / 1000000:.6f}
  80%   {time_takes_all[idx80] / 1000000:.6f}
  90%   {time_takes_all[idx90] / 1000000:.6f}
  95%   {time_takes_all[idx95] / 1000000:.6f}
  98%   {time_takes_all[idx98] / 1000000:.6f}
  99%   {time_takes_all[idx99] / 1000000:.6f}
 100%   {time_takes_all[-1] / 1000000:.6f} (longest task)"""


def test_and_print(test_name, num, num_threads, target_fun, num_warm_up=10):
    num_threads, total_time_takes, act_num, failed_num, time_takes_all = test(num, num_threads, target_fun, num_warm_up)
    print(format(test_name, num_threads, total_time_takes, act_num, failed_num, time_takes_all))


if __name__ == "__main__":
    c1 = create_counter()


    def task():
        get_and_increase(c1)


    test_and_print("My test",
                   7000000, 50
                   , task
                   , 20)
    print()
    print(f"c1:{get_and_increase(c1)}")

输出结果为:

My test test result:
 Concurrency Level: 50
 Time taken for tests:  1544.781000 ms
 Complete Tasks:    7,000,000
 Failed Tasks:      0
 Tasks per second:  10,133,790.35
 Time per task:     0.004933988 ms
 Time per task:     9.867976000000001e-05 ms (across all concurrent tasks)
 Shortest task:     -0.003000000 ms
 Percentage of the tasks served within a certain time (ms)
  50%   0.000000
  66%   0.000000
  75%   0.000000
  80%   0.000000
  90%   0.001000
  95%   0.001000
  98%   0.001000
  99%   0.001000
 100%   1120.871000 (longest task)
相关推荐
  政府信创国产化的10大政策解读一、信创国产化的背景与意义信创国产化,即信息技术应用创新国产化,是当前中国信息技术领域的一个重要发展方向。其核心在于通过自主研发和创新,实现信息技术应用的自主可控,减少对外部技术的依赖,并规避潜在的技术制裁和风险。随着全球信息技术竞争的加剧,以及某些国家对中国在科技领域的打压,信创国产化显...
工程项目管理   4053  
  为什么项目管理通常仍然耗时且低效?您是否还在反复更新电子表格、淹没在便利贴中并参加每周更新会议?这确实是耗费时间和精力。借助软件工具的帮助,您可以一目了然地全面了解您的项目。如今,国内外有足够多优秀的项目管理软件可以帮助您掌控每个项目。什么是项目管理软件?项目管理软件是广泛行业用于项目规划、资源分配和调度的软件。它使项...
项目管理软件   2777  
  本文介绍了以下10款项目管理软件工具:禅道项目管理软件、Freshdesk、ClickUp、nTask、Hubstaff、Plutio、Productive、Targa、Bonsai、Wrike。在当今快速变化的商业环境中,项目管理已成为企业成功的关键因素之一。然而,许多企业在项目管理过程中面临着诸多痛点,如任务分配不...
项目管理系统   95  
  本文介绍了以下10款项目管理软件工具:禅道项目管理软件、Monday、TeamGantt、Filestage、Chanty、Visor、Smartsheet、Productive、Quire、Planview。在当今快速变化的商业环境中,项目管理已成为企业成功的关键因素之一。然而,许多项目经理和团队在管理复杂项目时,常...
开源项目管理工具   111  
  本文介绍了以下10款项目管理软件工具:禅道项目管理软件、Smartsheet、GanttPRO、Backlog、Visor、ResourceGuru、Productive、Xebrio、Hive、Quire。在当今快节奏的商业环境中,项目管理已成为企业成功的关键因素之一。然而,许多企业在选择项目管理工具时常常面临困惑:...
项目管理系统   97  
热门文章
项目管理软件有哪些?
曾咪二维码

扫码咨询,免费领取项目管理大礼包!

云禅道AD
禅道项目管理软件

云端的项目管理软件

尊享禅道项目软件收费版功能

无需维护,随时随地协同办公

内置subversion和git源码管理

每天备份,随时转为私有部署

免费试用