高效的循环缓冲区?
- 2025-04-16 08:56:00
- admin 原创
- 16
问题描述:
我想在 python 中创建一个高效的循环缓冲区(目的是取缓冲区中整数值的平均值)。
这是使用列表收集值的有效方法吗?
def add_to_buffer( self, num ):
self.mylist.pop( 0 )
self.mylist.append( num )
什么会更有效率(以及为什么)?
解决方案 1:
我会用collections.deque
一个maxlen
参数
>>> import collections
>>> d = collections.deque(maxlen=10)
>>> d
deque([], maxlen=10)
>>> for i in xrange(20):
... d.append(i)
...
>>> d
deque([10, 11, 12, 13, 14, 15, 16, 17, 18, 19], maxlen=10)
文档里有一个和你想要的类似的方法deque
。我之所以断言它是最高效的,完全是因为它是由一个技术精湛、习惯于编写顶级代码的团队用 C 语言实现的。
解决方案 2:
虽然这里已经有很多很棒的答案,但我找不到任何关于上述选项时间的直接比较。因此,请参考下面我所做的比较。
仅出于测试目的,该类可以在list
基于的缓冲区、collections.deque
基于的缓冲区和Numpy.roll
基于的缓冲区之间切换。
请注意,为了简单起见,该update
方法每次只添加一个值。
import numpy
import timeit
import collections
class CircularBuffer(object):
buffer_methods = ('list', 'deque', 'roll')
def __init__(self, buffer_size, buffer_method):
self.content = None
self.size = buffer_size
self.method = buffer_method
self.update = getattr(self, '_update_' + buffer_method)
def _update_list(self, scalar):
try:
# shift
self.content.append(scalar)
self.content.pop(0)
except AttributeError:
# init
self.content = [0.] * self.size
def _update_deque(self, scalar):
try:
# shift
self.content.append(scalar)
except AttributeError:
# init
self.content = collections.deque([0.] * self.size, maxlen=self.size)
def _update_roll(self, scalar):
try:
# shift
self.content = numpy.roll(self.content, -1)
self.content[-1] = scalar
except IndexError:
# init
self.content = numpy.zeros(self.size, dtype=float)
# Testing and Timing
circular_buffer_size = 100
circular_buffers = [
CircularBuffer(buffer_size=circular_buffer_size, buffer_method=method)
for method in CircularBuffer.buffer_methods
]
timeit_iterations = 1e4
timeit_setup = 'from __main__ import circular_buffers'
timeit_results = []
for i, cb in enumerate(circular_buffers):
# We add a convenient number of convenient values (see equality test below)
code = '[circular_buffers[{}].update(float(j)) for j in range({})]'.format(
i, circular_buffer_size
)
# Testing
eval(code)
buffer_content = [item for item in cb.content]
assert buffer_content == list(range(circular_buffer_size))
# Timing
timeit_results.append(
timeit.timeit(code, setup=timeit_setup, number=int(timeit_iterations))
)
print(
'{}: total {:.2f}s ({:.2f}ms per iteration)'.format(
cb.method,
timeit_results[-1],
timeit_results[-1] / timeit_iterations * 1e3,
)
)
在我的系统上,结果如下:
deque: total 0.87s (0.09ms per iteration)
list: total 1.06s (0.11ms per iteration)
roll: total 6.27s (0.63ms per iteration)
解决方案 3:
从列表头部弹出会导致复制整个列表,因此效率低下
您应该使用固定大小的列表/数组和在添加/删除项目时在缓冲区中移动的索引
解决方案 4:
根据MoonCactus 的回答,这里有一个circularlist
类。与他的版本的区别在于,这里 c[0]
总是会给出最旧追加的元素、c[-1]
最新追加的元素、c[-2]
倒数第二个元素……这对于应用程序来说更自然。
c = circularlist(4)
c.append(1); print(c, c[0], c[-1]) #[1] (1/4 items) 1 1
c.append(2); print(c, c[0], c[-1]) #[1, 2] (2/4 items) 1 2
c.append(3); print(c, c[0], c[-1]) #[1, 2, 3] (3/4 items) 1 3
c.append(8); print(c, c[0], c[-1]) #[1, 2, 3, 8] (4/4 items) 1 8
c.append(10); print(c, c[0], c[-1]) #[2, 3, 8, 10] (4/4 items) 2 10
c.append(11); print(c, c[0], c[-1]) #[3, 8, 10, 11] (4/4 items) 3 11
d = circularlist(4, [1, 2, 3, 4, 5]) #[2, 3, 4, 5]
班级:
class circularlist(object):
def __init__(self, size, data = []):
"""Initialization"""
self.index = 0
self.size = size
self._data = list(data)[-size:]
def append(self, value):
"""Append an element"""
if len(self._data) == self.size:
self._data[self.index] = value
else:
self._data.append(value)
self.index = (self.index + 1) % self.size
def __getitem__(self, key):
"""Get element by index, relative to the current index"""
if len(self._data) == self.size:
return(self._data[(key + self.index) % self.size])
else:
return(self._data[key])
def __repr__(self):
"""Return string representation"""
return (self._data[self.index:] + self._data[:self.index]).__repr__() + ' (' + str(len(self._data))+'/{} items)'.format(self.size)
解决方案 5:
可以使用 deque 类,但对于问题的要求(平均),这是我的解决方案:
>>> from collections import deque
>>> class CircularBuffer(deque):
... def __init__(self, size=0):
... super(CircularBuffer, self).__init__(maxlen=size)
... @property
... def average(self): # TODO: Make type check for integer or floats
... return sum(self)/len(self)
...
>>>
>>> cb = CircularBuffer(size=10)
>>> for i in range(20):
... cb.append(i)
... print "@%s, Average: %s" % (cb, cb.average)
...
@deque([0], maxlen=10), Average: 0
@deque([0, 1], maxlen=10), Average: 0
@deque([0, 1, 2], maxlen=10), Average: 1
@deque([0, 1, 2, 3], maxlen=10), Average: 1
@deque([0, 1, 2, 3, 4], maxlen=10), Average: 2
@deque([0, 1, 2, 3, 4, 5], maxlen=10), Average: 2
@deque([0, 1, 2, 3, 4, 5, 6], maxlen=10), Average: 3
@deque([0, 1, 2, 3, 4, 5, 6, 7], maxlen=10), Average: 3
@deque([0, 1, 2, 3, 4, 5, 6, 7, 8], maxlen=10), Average: 4
@deque([0, 1, 2, 3, 4, 5, 6, 7, 8, 9], maxlen=10), Average: 4
@deque([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], maxlen=10), Average: 5
@deque([2, 3, 4, 5, 6, 7, 8, 9, 10, 11], maxlen=10), Average: 6
@deque([3, 4, 5, 6, 7, 8, 9, 10, 11, 12], maxlen=10), Average: 7
@deque([4, 5, 6, 7, 8, 9, 10, 11, 12, 13], maxlen=10), Average: 8
@deque([5, 6, 7, 8, 9, 10, 11, 12, 13, 14], maxlen=10), Average: 9
@deque([6, 7, 8, 9, 10, 11, 12, 13, 14, 15], maxlen=10), Average: 10
@deque([7, 8, 9, 10, 11, 12, 13, 14, 15, 16], maxlen=10), Average: 11
@deque([8, 9, 10, 11, 12, 13, 14, 15, 16, 17], maxlen=10), Average: 12
@deque([9, 10, 11, 12, 13, 14, 15, 16, 17, 18], maxlen=10), Average: 13
@deque([10, 11, 12, 13, 14, 15, 16, 17, 18, 19], maxlen=10), Average: 14
解决方案 6:
Python 的双端队列很慢。你也可以使用 numpy.roll 来代替。
如何旋转形状为 (n,) 或 (n,1) 的 numpy 数组中的数字?
在这个基准测试中,双端队列耗时 448 毫秒。Numpy.roll 耗时 29 毫秒
http://scimusing.wordpress.com/2013/10/25/ring-buffers-in-pythonnumpy/
解决方案 7:
您还可以查看这个相当古老的Python 配方。
这是我自己的带有 NumPy 数组的版本:
#!/usr/bin/env python
import numpy as np
class RingBuffer(object):
def __init__(self, size_max, default_value=0.0, dtype=float):
"""initialization"""
self.size_max = size_max
self._data = np.empty(size_max, dtype=dtype)
self._data.fill(default_value)
self.size = 0
def append(self, value):
"""append an element"""
self._data = np.roll(self._data, 1)
self._data[0] = value
self.size += 1
if self.size == self.size_max:
self.__class__ = RingBufferFull
def get_all(self):
"""return a list of elements from the oldest to the newest"""
return(self._data)
def get_partial(self):
return(self.get_all()[0:self.size])
def __getitem__(self, key):
"""get element"""
return(self._data[key])
def __repr__(self):
"""return string representation"""
s = self._data.__repr__()
s = s + ' ' + str(self.size)
s = s + ' ' + self.get_all()[::-1].__repr__()
s = s + ' ' + self.get_partial()[::-1].__repr__()
return(s)
class RingBufferFull(RingBuffer):
def append(self, value):
"""append an element when buffer is full"""
self._data = np.roll(self._data, 1)
self._data[0] = value
解决方案 8:
Python Cookbook 中的解决方案怎么样,包括当环形缓冲区实例变满时对其进行重新分类?
class RingBuffer:
""" class that implements a not-yet-full buffer """
def __init__(self,size_max):
self.max = size_max
self.data = []
class __Full:
""" class that implements a full buffer """
def append(self, x):
""" Append an element overwriting the oldest one. """
self.data[self.cur] = x
self.cur = (self.cur+1) % self.max
def get(self):
""" return list of elements in correct order """
return self.data[self.cur:]+self.data[:self.cur]
def append(self,x):
"""append an element at the end of the buffer"""
self.data.append(x)
if len(self.data) == self.max:
self.cur = 0
# Permanently change self's class from non-full to full
self.__class__ = self.__Full
def get(self):
""" Return a list of elements from the oldest to the newest. """
return self.data
# sample usage
if __name__=='__main__':
x=RingBuffer(5)
x.append(1); x.append(2); x.append(3); x.append(4)
print(x.__class__, x.get())
x.append(5)
print(x.__class__, x.get())
x.append(6)
print(x.data, x.get())
x.append(7); x.append(8); x.append(9); x.append(10)
print(x.data, x.get())
实现中值得注意的设计选择是,由于这些对象在其生命周期的某个时刻会经历不可逆的状态转换——从非满缓冲区到满缓冲区(并且行为在此时发生变化)——我通过更改 来模拟这种情况
self.__class__
。即使在 Python 2.2 中,只要两个类具有相同的槽位,这种方法也能正常工作(例如,对于两个经典类,例如__Full
本例中的 RingBuffer 和 ,这种方法就很好用)。在许多语言中,更改实例的类名可能显得有些奇怪,但对于其他表示偶尔、大规模、不可逆且离散的状态变化(这些变化会极大地影响行为)的方式而言,这是一种 Python 式的替代方法,就像本例中那样。值得庆幸的是,Python 支持所有类型的类。
图片来源:Sébastien Keim
解决方案 9:
这里有很多答案,但没有一个像D Left Adjoint to U建议的那样将 Numpy ndarray 子类化。这样可以避免使用无法有效扩展的 np.roll ,并保留 Numpy 数组的所有优点,例如数组切片。使用 Numpy 数组可以进行大多数您需要运行的分析,包括求平均值。
RingArray 类
我的解决方案使用Numpy 文档中写的指南对 np.ndarray 进行子类化。
RingArray 以指定的形状初始化,并用 np.nan 值填充。
Itertools 循环用于创建一个一维循环,该循环给出数组中下一行要编辑的位置。这基于初始化时数组的高度。
向 ndarray 方法添加了一个附加方法,用于将数据写入循环中的下一个位置。
class RingArray(np.ndarray):
"""A modified numpy array type that functions like a stack.
RingArray has a set size specified during initialisation.
Add new data using the append() method, which will replace the
next value in a cyclical fashion. The array itself has all the
properties of a numpy array e.g. it can be sliced and accessed as
normal. Initially fills the array with np.nan values.
Options
--------
shape : tuple
A tuple of (height, width) for the maximum size of the array.
Attributes
----------
Inherited from nd.array. Initially fills array with np.nan values.
Methods
--------
append(data)
Add/replace data in the next element of the cycle.
Data should be the length of the RingArray width.
"""
def __new__(subtype, shape):
obj = super().__new__(subtype, shape)
obj = np.vectorize(lambda x: np.nan)(obj)
obj._pointer = cycle(np.arange(0, shape[0]))
return obj
# needed by numpy
def __array_finalize__(self, obj):
if obj is None: return
# add data to the next element (looped)
def append(self, data):
"""Adds or replaces data in the RingArray.
The function writes to the next row in the Array.
Once the last row is reached, the assignment row
loops back to the start.
Parameters
----------
data : array_like
Data should be the length of the RingArray width.
"""
self[next(self._pointer)] = data
表现
我相信这种方法在 O(1) 上可扩展,但是我不是计算机科学家,所以如果我错了,请纠正我!
可能的问题
由于这是 ndarray 的子类,因此该类的所有方法都可以在 RingArray 上使用。使用 np.delete 等数组函数删除或添加值会改变数组的形状。由于循环是在初始化时设置的,因此这会导致错误。因此,使用 append() 以外的任何方法编辑数组时请务必谨慎。
这是我的第一篇 Stack Overflow 帖子,如果有任何我可以改进的地方,请告诉我:)。
解决方案 10:
我在进行串行编程之前就遇到过这个问题。当时也就是一年多前,我也找不到任何高效的实现,所以我最终自己写了一个 C 扩展,它也可以在 pypi 上使用 MIT 许可证。它非常基础,只能处理 8 位有符号字符的缓冲区,但长度灵活,所以如果你需要字符以外的其他数据,可以在其上使用 Struct 或其他结构。不过,现在我用谷歌搜索了一下,发现现在有很多选择,所以你可能也想看看这些。
解决方案 11:
这个不需要任何库。它增长一个列表,然后按索引循环。
占用空间非常小(无需任何库),运行速度至少是 dequeue 的两倍。这对于计算移动平均值确实很有用,但请注意,这些项目不像上面那样按年龄排序。
class CircularBuffer(object):
def __init__(self, size):
"""initialization"""
self.index= 0
self.size= size
self._data = []
def record(self, value):
"""append an element"""
if len(self._data) == self.size:
self._data[self.index]= value
else:
self._data.append(value)
self.index= (self.index + 1) % self.size
def __getitem__(self, key):
"""get element by index like a regular array"""
return(self._data[key])
def __repr__(self):
"""return string representation"""
return self._data.__repr__() + ' (' + str(len(self._data))+' items)'
def get_all(self):
"""return a list of all the elements"""
return(self._data)
要获得平均值,例如:
q= CircularBuffer(1000000);
for i in range(40000):
q.record(i);
print "capacity=", q.size
print "stored=", len(q.get_all())
print "average=", sum(q.get_all()) / len(q.get_all())
结果:
capacity= 1000000
stored= 40000
average= 19999
real 0m0.024s
user 0m0.020s
sys 0m0.000s
这大约是出队时所需时间的 1/3。
解决方案 12:
来自 Github:
class CircularBuffer:
def __init__(self, size):
"""Store buffer in given storage."""
self.buffer = [None]*size
self.low = 0
self.high = 0
self.size = size
self.count = 0
def isEmpty(self):
"""Determines if buffer is empty."""
return self.count == 0
def isFull(self):
"""Determines if buffer is full."""
return self.count == self.size
def __len__(self):
"""Returns number of elements in buffer."""
return self.count
def add(self, value):
"""Adds value to buffer, overwrite as needed."""
if self.isFull():
self.low = (self.low+1) % self.size
else:
self.count += 1
self.buffer[self.high] = value
self.high = (self.high + 1) % self.size
def remove(self):
"""Removes oldest value from non-empty buffer."""
if self.count == 0:
raise Exception ("Circular Buffer is empty");
value = self.buffer[self.low]
self.low = (self.low + 1) % self.size
self.count -= 1
return value
def __iter__(self):
"""Return elements in the circular buffer in order using iterator."""
idx = self.low
num = self.count
while num > 0:
yield self.buffer[idx]
idx = (idx + 1) % self.size
num -= 1
def __repr__(self):
"""String representation of circular buffer."""
if self.isEmpty():
return 'cb:[]'
return 'cb:[' + ','.join(map(str,self)) + ']'
https://github.com/heineman/python-data-structs/blob/master/2.%20Ubiquitous%20Lists/circBuffer.py
解决方案 13:
最初的问题是:“高效的”循环缓冲区。根据所要求的效率,aaronasterling 的答案似乎完全正确。使用一个用 Python 编写的专用类,并与 collections.deque 进行比较,结果显示 deque 的速度提高了 5.2 倍!以下是一段非常简单的测试代码:
class cb:
def __init__(self, size):
self.b = [0]*size
self.i = 0
self.sz = size
def append(self, v):
self.b[self.i] = v
self.i = (self.i + 1) % self.sz
b = cb(1000)
for i in range(10000):
b.append(i)
# called 200 times, this lasts 1.097 second on my laptop
from collections import deque
b = deque( [], 1000 )
for i in range(10000):
b.append(i)
# called 200 times, this lasts 0.211 second on my laptop
要将双端队列转换为列表,只需使用:
my_list = [v for v in my_deque]
这样,你就能以 O(1) 的复杂度随机访问双端队列中的元素。当然,这只有在设置一次双端队列后需要多次随机访问时才有用。
解决方案 14:
我这里找不到答案。显然,如果你在 NumPy 中工作,你通常需要子类化数组或 ndarray,这样(至少在你的循环数组写满之后)你仍然可以在循环数组上使用 NumPy 数组的算术运算。唯一需要注意的是,对于跨越多个组件的操作(例如移动平均线),你的窗口不要大于缓冲区中累积的大小。
另外,正如所有评论者提到的,不要使用滚动,因为这违背了提高效率的目的。如果需要一个不断增长的数组,只需在每次需要调整大小时将其大小加倍即可(这与循环数组的实现不同)。
解决方案 15:
这与用于保存最新文本消息的一些缓冲区应用了相同的原理。
import time
import datetime
import sys, getopt
class textbffr(object):
def __init__(self, size_max):
#initialization
self.posn_max = size_max-1
self._data = [""]*(size_max)
self.posn = self.posn_max
def append(self, value):
#append an element
if self.posn == self.posn_max:
self.posn = 0
self._data[self.posn] = value
else:
self.posn += 1
self._data[self.posn] = value
def __getitem__(self, key):
#return stored element
if (key + self.posn+1) > self.posn_max:
return(self._data[key - (self.posn_max-self.posn)])
else:
return(self._data[key + self.posn+1])
def print_bffr(bffr,bffer_max):
for ind in range(0,bffer_max):
stored = bffr[ind]
if stored != "":
print(stored)
print ( '
' )
def make_time_text(time_value):
return(str(time_value.month).zfill(2) + str(time_value.day).zfill(2)
+ str(time_value.hour).zfill(2) + str(time_value.minute).zfill(2)
+ str(time_value.second).zfill(2))
def main(argv):
#Set things up
starttime = datetime.datetime.now()
log_max = 5
status_max = 7
log_bffr = textbffr(log_max)
status_bffr = textbffr(status_max)
scan_count = 1
#Main Loop
# every 10 secounds write a line with the time and the scan count.
while True:
time_text = make_time_text(datetime.datetime.now())
#create next messages and store in buffers
status_bffr.append(str(scan_count).zfill(6) + " : Status is just fine at : " + time_text)
log_bffr.append(str(scan_count).zfill(6) + " : " + time_text + " : Logging Text ")
#print whole buffers so far
print_bffr(log_bffr,log_max)
print_bffr(status_bffr,status_max)
time.sleep(2)
scan_count += 1
if __name__ == '__main__':
main(sys.argv[1:])
扫码咨询,免费领取项目管理大礼包!