高效的循环缓冲区?

2025-04-16 08:56:00
admin
原创
17
摘要:问题描述:我想在 python 中创建一个高效的循环缓冲区(目的是取缓冲区中整数值的平均值)。这是使用列表收集值的有效方法吗?def add_to_buffer( self, num ): self.mylist.pop( 0 ) self.mylist.append( num ) 什么会更有...

问题描述:

我想在 python 中创建一个高效的循环缓冲区(目的是取缓冲区中整数值的平均值)。

这是使用列表收集值的有效方法吗?

def add_to_buffer( self, num ):
    self.mylist.pop( 0 )
    self.mylist.append( num )

什么会更有效率(以及为什么)?


解决方案 1:

我会用collections.deque一个maxlen参数

>>> import collections
>>> d = collections.deque(maxlen=10)
>>> d
deque([], maxlen=10)
>>> for i in xrange(20):
...     d.append(i)
... 
>>> d
deque([10, 11, 12, 13, 14, 15, 16, 17, 18, 19], maxlen=10)

文档里有一个和你想要的类似的方法deque。我之所以断言它是最高效的,完全是因为它是由一个技术精湛、习惯于编写顶级代码的团队用 C 语言实现的。

解决方案 2:

虽然这里已经有很多很棒的答案,但我找不到任何关于上述选项时间的直接比较。因此,请参考下面我所做的比较。

仅出于测试目的,该类可以在list基于的缓冲区、collections.deque基于的缓冲区和Numpy.roll基于的缓冲区之间切换。

请注意,为了简单起见,该update方法每次只添加一个值。

import numpy
import timeit
import collections


class CircularBuffer(object):
    buffer_methods = ('list', 'deque', 'roll')

    def __init__(self, buffer_size, buffer_method):
        self.content = None
        self.size = buffer_size
        self.method = buffer_method
        self.update = getattr(self, '_update_' + buffer_method)

    def _update_list(self, scalar):
        try:
            # shift
            self.content.append(scalar)
            self.content.pop(0)
        except AttributeError:
            # init
            self.content = [0.] * self.size

    def _update_deque(self, scalar):
        try:
            # shift
            self.content.append(scalar)
        except AttributeError:
            # init
            self.content = collections.deque([0.] * self.size, maxlen=self.size)

    def _update_roll(self, scalar):
        try:
            # shift
            self.content = numpy.roll(self.content, -1)
            self.content[-1] = scalar
        except IndexError:
            # init
            self.content = numpy.zeros(self.size, dtype=float)


# Testing and Timing
circular_buffer_size = 100
circular_buffers = [
    CircularBuffer(buffer_size=circular_buffer_size, buffer_method=method)
    for method in CircularBuffer.buffer_methods
]
timeit_iterations = 1e4
timeit_setup = 'from __main__ import circular_buffers'
timeit_results = []
for i, cb in enumerate(circular_buffers):
    # We add a convenient number of convenient values (see equality test below)
    code = '[circular_buffers[{}].update(float(j)) for j in range({})]'.format(
        i, circular_buffer_size
    )
    # Testing
    eval(code)
    buffer_content = [item for item in cb.content]
    assert buffer_content == list(range(circular_buffer_size))
    # Timing
    timeit_results.append(
        timeit.timeit(code, setup=timeit_setup, number=int(timeit_iterations))
    )
    print(
        '{}: total {:.2f}s ({:.2f}ms per iteration)'.format(
            cb.method,
            timeit_results[-1],
            timeit_results[-1] / timeit_iterations * 1e3,
        )
    )

在我的系统上,结果如下:

deque: total 0.87s (0.09ms per iteration)
list:  total 1.06s (0.11ms per iteration)
roll:  total 6.27s (0.63ms per iteration)

解决方案 3:

从列表头部弹出会导致复制整个列表,因此效率低下

您应该使用固定大小的列表/数组和在添加/删除项目时在缓冲区中移动的索引

解决方案 4:

根据MoonCactus 的回答,这里有一个circularlist类。与他的版本的区别在于,这里 c[0]总是会给出最旧追加的元素、c[-1]最新追加的元素、c[-2]倒数第二个元素……这对于应用程序来说更自然。

c = circularlist(4)
c.append(1); print(c, c[0], c[-1])    #[1] (1/4 items)              1  1
c.append(2); print(c, c[0], c[-1])    #[1, 2] (2/4 items)           1  2
c.append(3); print(c, c[0], c[-1])    #[1, 2, 3] (3/4 items)        1  3
c.append(8); print(c, c[0], c[-1])    #[1, 2, 3, 8] (4/4 items)     1  8
c.append(10); print(c, c[0], c[-1])   #[2, 3, 8, 10] (4/4 items)    2  10
c.append(11); print(c, c[0], c[-1])   #[3, 8, 10, 11] (4/4 items)   3  11
d = circularlist(4, [1, 2, 3, 4, 5])  #[2, 3, 4, 5]

班级:

class circularlist(object):
    def __init__(self, size, data = []):
        """Initialization"""
        self.index = 0
        self.size = size
        self._data = list(data)[-size:]

    def append(self, value):
        """Append an element"""
        if len(self._data) == self.size:
            self._data[self.index] = value
        else:
            self._data.append(value)
        self.index = (self.index + 1) % self.size

    def __getitem__(self, key):
        """Get element by index, relative to the current index"""
        if len(self._data) == self.size:
            return(self._data[(key + self.index) % self.size])
        else:
            return(self._data[key])

    def __repr__(self):
        """Return string representation"""
        return (self._data[self.index:] + self._data[:self.index]).__repr__() + ' (' + str(len(self._data))+'/{} items)'.format(self.size)

解决方案 5:

可以使用 deque 类,但对于问题的要求(平均),这是我的解决方案:

>>> from collections import deque
>>> class CircularBuffer(deque):
...     def __init__(self, size=0):
...             super(CircularBuffer, self).__init__(maxlen=size)
...     @property
...     def average(self):  # TODO: Make type check for integer or floats
...             return sum(self)/len(self)
...
>>>
>>> cb = CircularBuffer(size=10)
>>> for i in range(20):
...     cb.append(i)
...     print "@%s, Average: %s" % (cb, cb.average)
...
@deque([0], maxlen=10), Average: 0
@deque([0, 1], maxlen=10), Average: 0
@deque([0, 1, 2], maxlen=10), Average: 1
@deque([0, 1, 2, 3], maxlen=10), Average: 1
@deque([0, 1, 2, 3, 4], maxlen=10), Average: 2
@deque([0, 1, 2, 3, 4, 5], maxlen=10), Average: 2
@deque([0, 1, 2, 3, 4, 5, 6], maxlen=10), Average: 3
@deque([0, 1, 2, 3, 4, 5, 6, 7], maxlen=10), Average: 3
@deque([0, 1, 2, 3, 4, 5, 6, 7, 8], maxlen=10), Average: 4
@deque([0, 1, 2, 3, 4, 5, 6, 7, 8, 9], maxlen=10), Average: 4
@deque([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], maxlen=10), Average: 5
@deque([2, 3, 4, 5, 6, 7, 8, 9, 10, 11], maxlen=10), Average: 6
@deque([3, 4, 5, 6, 7, 8, 9, 10, 11, 12], maxlen=10), Average: 7
@deque([4, 5, 6, 7, 8, 9, 10, 11, 12, 13], maxlen=10), Average: 8
@deque([5, 6, 7, 8, 9, 10, 11, 12, 13, 14], maxlen=10), Average: 9
@deque([6, 7, 8, 9, 10, 11, 12, 13, 14, 15], maxlen=10), Average: 10
@deque([7, 8, 9, 10, 11, 12, 13, 14, 15, 16], maxlen=10), Average: 11
@deque([8, 9, 10, 11, 12, 13, 14, 15, 16, 17], maxlen=10), Average: 12
@deque([9, 10, 11, 12, 13, 14, 15, 16, 17, 18], maxlen=10), Average: 13
@deque([10, 11, 12, 13, 14, 15, 16, 17, 18, 19], maxlen=10), Average: 14

解决方案 6:

Python 的双端队列很慢。你也可以使用 numpy.roll 来代替。
如何旋转形状为 (n,) 或 (n,1) 的 numpy 数组中的数字?

在这个基准测试中,双端队列耗时 448 毫秒。Numpy.roll 耗时 29 毫秒
http://scimusing.wordpress.com/2013/10/25/ring-buffers-in-pythonnumpy/

解决方案 7:

您还可以查看这个相当古老的Python 配方。

这是我自己的带有 NumPy 数组的版本:

#!/usr/bin/env python

import numpy as np

class RingBuffer(object):
    def __init__(self, size_max, default_value=0.0, dtype=float):
        """initialization"""
        self.size_max = size_max

        self._data = np.empty(size_max, dtype=dtype)
        self._data.fill(default_value)

        self.size = 0

    def append(self, value):
        """append an element"""
        self._data = np.roll(self._data, 1)
        self._data[0] = value 

        self.size += 1

        if self.size == self.size_max:
            self.__class__  = RingBufferFull

    def get_all(self):
        """return a list of elements from the oldest to the newest"""
        return(self._data)

    def get_partial(self):
        return(self.get_all()[0:self.size])

    def __getitem__(self, key):
        """get element"""
        return(self._data[key])

    def __repr__(self):
        """return string representation"""
        s = self._data.__repr__()
        s = s + '    ' + str(self.size)
        s = s + '    ' + self.get_all()[::-1].__repr__()
        s = s + '    ' + self.get_partial()[::-1].__repr__()
        return(s)

class RingBufferFull(RingBuffer):
    def append(self, value):
        """append an element when buffer is full"""
        self._data = np.roll(self._data, 1)
        self._data[0] = value

解决方案 8:

Python Cookbook 中的解决方案怎么样,包括当环形缓冲区实例变满时对其进行重新分类?

class RingBuffer:
    """ class that implements a not-yet-full buffer """
    def __init__(self,size_max):
        self.max = size_max
        self.data = []

    class __Full:
        """ class that implements a full buffer """
        def append(self, x):
            """ Append an element overwriting the oldest one. """
            self.data[self.cur] = x
            self.cur = (self.cur+1) % self.max
        def get(self):
            """ return list of elements in correct order """
            return self.data[self.cur:]+self.data[:self.cur]

    def append(self,x):
        """append an element at the end of the buffer"""
        self.data.append(x)
        if len(self.data) == self.max:
            self.cur = 0
            # Permanently change self's class from non-full to full
            self.__class__ = self.__Full

    def get(self):
        """ Return a list of elements from the oldest to the newest. """
        return self.data

# sample usage
if __name__=='__main__':
    x=RingBuffer(5)
    x.append(1); x.append(2); x.append(3); x.append(4)
    print(x.__class__, x.get())
    x.append(5)
    print(x.__class__, x.get())
    x.append(6)
    print(x.data, x.get())
    x.append(7); x.append(8); x.append(9); x.append(10)
    print(x.data, x.get())

实现中值得注意的设计选择是,由于这些对象在其生命周期的某个时刻会经历不可逆的状态转换——从非满缓冲区到满缓冲区(并且行为在此时发生变化)——我通过更改 来模拟这种情况self.__class__。即使在 Python 2.2 中,只要两个类具有相同的槽位,这种方法也能正常工作(例如,对于两个经典类,例如__Full本例中的 RingBuffer 和 ,这种方法就很好用)。

在许多语言中,更改实例的类名可能显得有些奇怪,但对于其他表示偶尔、大规模、不可逆且离散的状态变化(这些变化会极大地影响行为)的方式而言,这是一种 Python 式的替代方法,就像本例中那样。值得庆幸的是,Python 支持所有类型的类。

图片来源:Sébastien Keim

解决方案 9:

这里有很多答案,但没有一个像D Left Adjoint to U建议的那样将 Numpy ndarray 子类化。这样可以避免使用无法有效扩展的 np.roll ,并保留 Numpy 数组的所有优点,例如数组切片。使用 Numpy 数组可以进行大多数您需要运行的分析,包括求平均值。

RingArray 类

我的解决方案使用Numpy 文档中写的指南对 np.ndarray 进行子类化。

RingArray 以指定的形状初始化,并用 np.nan 值填充。

Itertools 循环用于创建一个一维循环,该循环给出数组中下一行要编辑的位置。这基于初始化时数组的高度。

向 ndarray 方法添加了一个附加方法,用于将数据写入循环中的下一个位置。

class RingArray(np.ndarray):
    """A modified numpy array type that functions like a stack. 
    RingArray has a set size specified during initialisation. 
    Add new data using the append() method, which will replace the 
    next value in a cyclical fashion. The array itself has all the 
    properties of a numpy array e.g. it can be sliced and accessed as 
    normal. Initially fills the array with np.nan values.
    
    Options
    --------
    shape : tuple
        A tuple of (height, width) for the maximum size of the array.

    Attributes
    ----------
    Inherited from nd.array. Initially fills array with np.nan values.
    
    Methods
    --------
    append(data)
        Add/replace data in the next element of the cycle.
        Data should be the length of the RingArray width.
    
    """    
    def __new__(subtype, shape):
        obj = super().__new__(subtype, shape)
        
        obj = np.vectorize(lambda x: np.nan)(obj)
        
        obj._pointer = cycle(np.arange(0, shape[0]))
        
        return obj
    
    # needed by numpy
    def __array_finalize__(self, obj):
         if obj is None: return
        
    # add data to the next element (looped)
    def append(self, data):
        """Adds or replaces data in the RingArray.
        The function writes to the next row in the Array.
        Once the last row is reached, the assignment row 
        loops back to the start.

        Parameters
        ----------
        data : array_like
            Data should be the length of the RingArray width.
        """        
        self[next(self._pointer)] = data

表现

我相信这种方法在 O(1) 上可扩展,但是我不是计算机科学家,所以如果我错了,请纠正我!

可能的问题

由于这是 ndarray 的子类,因此该类的所有方法都可以在 RingArray 上使用。使用 np.delete 等数组函数删除或添加值会改变数组的形状。由于循环是在初始化时设置的,因此这会导致错误。因此,使用 append() 以外的任何方法编辑数组时请务必谨慎。

这是我的第一篇 Stack Overflow 帖子,如果有任何我可以改进的地方,请告诉我:)。

解决方案 10:

我在进行串行编程之前就遇到过这个问题。当时也就是一年多前,我也找不到任何高效的实现,所以我最终自己写了一个 C 扩展,它也可以在 pypi 上使用 MIT 许可证。它非常基础,只能处理 8 位有符号字符的缓冲区,但长度灵活,所以如果你需要字符以外的其他数据,可以在其上使用 Struct 或其他结构。不过,现在我用谷歌搜索了一下,发现现在有很多选择,所以你可能也想看看这些。

解决方案 11:

这个不需要任何库。它增长一个列表,然后按索引循环。

占用空间非常小(无需任何库),运行速度至少是 dequeue 的两倍。这对于计算移动平均值确实很有用,但请注意,这些项目不像上面那样按年龄排序。

class CircularBuffer(object):
    def __init__(self, size):
        """initialization"""
        self.index= 0
        self.size= size
        self._data = []

    def record(self, value):
        """append an element"""
        if len(self._data) == self.size:
            self._data[self.index]= value
        else:
            self._data.append(value)
        self.index= (self.index + 1) % self.size

    def __getitem__(self, key):
        """get element by index like a regular array"""
        return(self._data[key])

    def __repr__(self):
        """return string representation"""
        return self._data.__repr__() + ' (' + str(len(self._data))+' items)'

    def get_all(self):
        """return a list of all the elements"""
        return(self._data)

要获得平均值,例如:

q= CircularBuffer(1000000);
for i in range(40000):
    q.record(i);
print "capacity=", q.size
print "stored=", len(q.get_all())
print "average=", sum(q.get_all()) / len(q.get_all())

结果:

capacity= 1000000
stored= 40000
average= 19999

real 0m0.024s
user 0m0.020s
sys  0m0.000s

这大约是出队时所需时间的 1/3。

解决方案 12:

来自 Github:

class CircularBuffer:

    def __init__(self, size):
        """Store buffer in given storage."""
        self.buffer = [None]*size
        self.low = 0
        self.high = 0
        self.size = size
        self.count = 0

    def isEmpty(self):
        """Determines if buffer is empty."""
        return self.count == 0

    def isFull(self):
        """Determines if buffer is full."""
        return self.count == self.size

    def __len__(self):
        """Returns number of elements in buffer."""
        return self.count

    def add(self, value):
        """Adds value to buffer, overwrite as needed."""
        if self.isFull():
            self.low = (self.low+1) % self.size
        else:
            self.count += 1
        self.buffer[self.high] = value
        self.high = (self.high + 1) % self.size

    def remove(self):
        """Removes oldest value from non-empty buffer."""
        if self.count == 0:
            raise Exception ("Circular Buffer is empty");
        value = self.buffer[self.low]
        self.low = (self.low + 1) % self.size
        self.count -= 1
        return value

    def __iter__(self):
        """Return elements in the circular buffer in order using iterator."""
        idx = self.low
        num = self.count
        while num > 0:
            yield self.buffer[idx]
            idx = (idx + 1) % self.size
            num -= 1

    def __repr__(self):
        """String representation of circular buffer."""
        if self.isEmpty():
            return 'cb:[]'

        return 'cb:[' + ','.join(map(str,self)) + ']'

https://github.com/heineman/python-data-structs/blob/master/2.%20Ubiquitous%20Lists/circBuffer.py

解决方案 13:

最初的问题是:“高效的”循环缓冲区。根据所要求的效率,aaronasterling 的答案似乎完全正确。使用一个用 Python 编写的专用类,并与 collections.deque 进行比较,结果显示 deque 的速度提高了 5.2 倍!以下是一段非常简单的测试代码:

class cb:
    def __init__(self, size):
        self.b = [0]*size
        self.i = 0
        self.sz = size
    def append(self, v):
        self.b[self.i] = v
        self.i = (self.i + 1) % self.sz

b = cb(1000)
for i in range(10000):
    b.append(i)
# called 200 times, this lasts 1.097 second on my laptop

from collections import deque
b = deque( [], 1000 )
for i in range(10000):
    b.append(i)
# called 200 times, this lasts 0.211 second on my laptop

要将双端队列转换为列表,只需使用:

my_list = [v for v in my_deque]

这样,你就能以 O(1) 的复杂度随机访问双端队列中的元素。当然,这只有在设置一次双端队列后需要多次随机访问时才有用。

解决方案 14:

我这里找不到答案。显然,如果你在 NumPy 中工作,你通常需要子类化数组或 ndarray,这样(至少在你的循环数组写满之后)你仍然可以在循环数组上使用 NumPy 数组的算术运算。唯一需要注意的是,对于跨越多个组件的操作(例如移动平均线),你的窗口不要大于缓冲区中累积的大小。

另外,正如所有评论者提到的,不要使用滚动,因为这违背了提高效率的目的。如果需要一个不断增长的数组,只需在每次需要调整大小时将其大小加倍即可(这与循环数组的实现不同)。

解决方案 15:

这与用于保存最新文本消息的一些缓冲区应用了相同的原理。

import time
import datetime
import sys, getopt

class textbffr(object):
    def __init__(self, size_max):
        #initialization
        self.posn_max = size_max-1
        self._data = [""]*(size_max)
        self.posn = self.posn_max

    def append(self, value):
        #append an element
        if self.posn == self.posn_max:
            self.posn = 0
            self._data[self.posn] = value   
        else:
            self.posn += 1
            self._data[self.posn] = value

    def __getitem__(self, key):
        #return stored element
        if (key + self.posn+1) > self.posn_max:
            return(self._data[key - (self.posn_max-self.posn)])
        else:
            return(self._data[key + self.posn+1])


def print_bffr(bffr,bffer_max): 
    for ind in range(0,bffer_max):
        stored = bffr[ind]
        if stored != "":
            print(stored)
    print ( '
' )

def make_time_text(time_value):
    return(str(time_value.month).zfill(2) + str(time_value.day).zfill(2)
      + str(time_value.hour).zfill(2) +  str(time_value.minute).zfill(2)
      + str(time_value.second).zfill(2))


def main(argv):
    #Set things up 
    starttime = datetime.datetime.now()
    log_max = 5
    status_max = 7
    log_bffr = textbffr(log_max)
    status_bffr = textbffr(status_max)
    scan_count = 1

    #Main Loop
    # every 10 secounds write a line with the time and the scan count.
    while True: 

        time_text = make_time_text(datetime.datetime.now())
        #create next messages and store in buffers
        status_bffr.append(str(scan_count).zfill(6) + " :  Status is just fine at : " + time_text)
        log_bffr.append(str(scan_count).zfill(6) + " : " + time_text + " : Logging Text ")

        #print whole buffers so far
        print_bffr(log_bffr,log_max)
        print_bffr(status_bffr,status_max)

        time.sleep(2)
        scan_count += 1 

if __name__ == '__main__':
    main(sys.argv[1:])  
相关推荐
  政府信创国产化的10大政策解读一、信创国产化的背景与意义信创国产化,即信息技术应用创新国产化,是当前中国信息技术领域的一个重要发展方向。其核心在于通过自主研发和创新,实现信息技术应用的自主可控,减少对外部技术的依赖,并规避潜在的技术制裁和风险。随着全球信息技术竞争的加剧,以及某些国家对中国在科技领域的打压,信创国产化显...
工程项目管理   2482  
  为什么项目管理通常仍然耗时且低效?您是否还在反复更新电子表格、淹没在便利贴中并参加每周更新会议?这确实是耗费时间和精力。借助软件工具的帮助,您可以一目了然地全面了解您的项目。如今,国内外有足够多优秀的项目管理软件可以帮助您掌控每个项目。什么是项目管理软件?项目管理软件是广泛行业用于项目规划、资源分配和调度的软件。它使项...
项目管理软件   1533  
  PLM(产品生命周期管理)项目对于企业优化产品研发流程、提升产品质量以及增强市场竞争力具有至关重要的意义。然而,在项目推进过程中,范围蔓延是一个常见且棘手的问题,它可能导致项目进度延迟、成本超支以及质量下降等一系列不良后果。因此,有效避免PLM项目范围蔓延成为项目成功的关键因素之一。以下将详细阐述三大管控策略,助力企业...
plm系统   0  
  PLM(产品生命周期管理)项目管理在企业产品研发与管理过程中扮演着至关重要的角色。随着市场竞争的加剧和产品复杂度的提升,PLM项目面临着诸多风险。准确量化风险优先级并采取有效措施应对,是确保项目成功的关键。五维评估矩阵作为一种有效的风险评估工具,能帮助项目管理者全面、系统地评估风险,为决策提供有力支持。五维评估矩阵概述...
免费plm软件   0  
  引言PLM(产品生命周期管理)开发流程对于企业产品的全生命周期管控至关重要。它涵盖了从产品概念设计到退役的各个阶段,直接影响着产品质量、开发周期以及企业的市场竞争力。在当今快速发展的科技环境下,客户对产品质量的要求日益提高,市场竞争也愈发激烈,这就使得优化PLM开发流程成为企业的必然选择。缺陷管理工具和六西格玛方法作为...
plm产品全生命周期管理   0  
热门文章
项目管理软件有哪些?
曾咪二维码

扫码咨询,免费领取项目管理大礼包!

云禅道AD
禅道项目管理软件

云端的项目管理软件

尊享禅道项目软件收费版功能

无需维护,随时随地协同办公

内置subversion和git源码管理

每天备份,随时转为私有部署

免费试用