什么是好的速率限制算法?
- 2025-02-17 09:25:00
- admin 原创
- 62
问题描述:
我可以使用一些伪代码,或者更好的 Python。我正在尝试为 Python IRC 机器人实现速率限制队列,并且它部分有效,但如果有人触发的消息少于限制(例如,速率限制为每 8 秒 5 条消息,而该人仅触发 4 条),并且下一次触发超过 8 秒(例如,16 秒后),机器人会发送消息,但队列已满,机器人会等待 8 秒,即使由于 8 秒期限已过,因此不需要等待。
解决方案 1:
这里是最简单的算法,如果您只想在消息到达太快时丢弃它们(而不是将它们排队,这是有意义的,因为队列可能会变得任意大):
rate = 5.0; // unit: messages
per = 8.0; // unit: seconds
allowance = rate; // unit: messages
last_check = now(); // floating-point, e.g. usec accuracy. Unit: seconds
when (message_received):
current = now();
time_passed = current - last_check;
last_check = current;
allowance += time_passed * (rate / per);
if (allowance > rate):
allowance = rate; // throttle
if (allowance < 1.0):
discard_message();
else:
forward_message();
allowance -= 1.0;
此解决方案中没有数据结构、计时器等,并且运行干净 :) 可以看到,“限额”以每秒最多 5/8 个单位的速度增长,即每八秒最多五个单位。每转发一条消息都会扣除一个单位,因此每八秒您不能发送超过五条消息。
请注意,rate
应为整数,即没有非零小数部分,否则算法将无法正常工作(实际利率将不会是rate/per
)。例如,由于永远不会增长到 1.0,所以rate=0.5; per=1.0;
不起作用。但工作正常。allowance
`rate=1.0; per=2.0;`
解决方案 2:
在入队函数之前使用此装饰器@RateLimited(ratepersec)。
基本上,这会检查自上次以来是否已过去 1/rate 秒,如果没有,则等待剩余时间,否则不等待。这实际上限制了您的速率/秒。装饰器可以应用于您想要限制速率的任何函数。
在您的情况下,如果您希望每 8 秒最多发送 5 条消息,请在 sendToQueue 函数之前使用 @RateLimited(0.625)。
import time
def RateLimited(maxPerSecond):
minInterval = 1.0 / float(maxPerSecond)
def decorate(func):
lastTimeCalled = [0.0]
def rateLimitedFunction(*args,**kargs):
elapsed = time.clock() - lastTimeCalled[0]
leftToWait = minInterval - elapsed
if leftToWait>0:
time.sleep(leftToWait)
ret = func(*args,**kargs)
lastTimeCalled[0] = time.clock()
return ret
return rateLimitedFunction
return decorate
@RateLimited(2) # 2 per second at most
def PrintNumber(num):
print num
if __name__ == "__main__":
print "This should print 1,2,3... at about 2 per second."
for i in range(1,100):
PrintNumber(i)
解决方案 3:
令牌桶的实现相当简单。
从一个装有 5 个代币的桶开始。
每 5/8 秒:如果存储桶中的令牌少于 5 个,则添加一个。
每次要发送消息时:如果存储桶中有≥1个令牌,则取出一个令牌并发送消息。否则,等待/丢弃消息/等等。
(显然,在实际代码中,您将使用整数计数器而不是真实标记,并且可以通过存储时间戳来优化每 5/8 秒的步骤)
再次阅读问题,如果每 8 秒完全重置速率限制,那么这里有一个修改:
last_send
从很久以前的时间戳开始(例如,纪元)。此外,从相同的 5 令牌桶开始。
打破每 5/8 秒规则。
每次发送消息时:首先,检查是否last_send
≥8秒前。如果是,则填充桶(将其设置为5个令牌)。其次,如果桶中有令牌,则发送消息(否则,丢弃/等待/等)。第三,设置last_send
为现在。
这应该适用于那种情况。
我实际上已经使用类似这样的策略(第一种方法)编写了一个 IRC 机器人。它是用 Perl 而不是 Python 编写的,但这里有一些代码可以说明:
这里的第一部分处理将令牌添加到存储桶中。您可以看到基于时间添加令牌的优化(倒数第二行),然后最后一行将存储桶内容限制为最大值(MESSAGE_BURST)
my $start_time = time;
...
# Bucket handling
my $bucket = $conn->{fujiko_limit_bucket};
my $lasttx = $conn->{fujiko_limit_lasttx};
$bucket += ($start_time-$lasttx)/MESSAGE_INTERVAL;
($bucket <= MESSAGE_BURST) or $bucket = MESSAGE_BURST;
$conn 是一个传递的数据结构。它位于一个例行运行的方法中(它计算下一次什么时候有事要做,然后休眠一段时间或直到收到网络流量)。该方法的下一部分处理发送。它相当复杂,因为消息具有与之关联的优先级。
# Queue handling. Start with the ultimate queue.
my $queues = $conn->{fujiko_queues};
foreach my $entry (@{$queues->[PRIORITY_ULTIMATE]}) {
# Ultimate is special. We run ultimate no matter what. Even if
# it sends the bucket negative.
--$bucket;
$entry->{code}(@{$entry->{args}});
}
$queues->[PRIORITY_ULTIMATE] = [];
这是第一个队列,无论如何都会运行。即使我们的连接因泛洪而中断。用于极其重要的事情,例如响应服务器的 PING。接下来是其余的队列:
# Continue to the other queues, in order of priority.
QRUN: for (my $pri = PRIORITY_HIGH; $pri >= PRIORITY_JUNK; --$pri) {
my $queue = $queues->[$pri];
while (scalar(@$queue)) {
if ($bucket < 1) {
# continue later.
$need_more_time = 1;
last QRUN;
} else {
--$bucket;
my $entry = shift @$queue;
$entry->{code}(@{$entry->{args}});
}
}
}
最后,bucket 状态被保存回 $conn 数据结构(实际上在方法中稍后;它首先计算多久会有更多工作)
# Save status.
$conn->{fujiko_limit_bucket} = $bucket;
$conn->{fujiko_limit_lasttx} = $start_time;
如您所见,实际的存储桶处理代码非常小 — 大约四行。其余代码是优先级队列处理。该机器人具有优先级队列,因此,例如,与其聊天的人无法阻止它执行其重要的踢出/禁止任务。
解决方案 4:
为了阻止处理直到消息可以发送,从而排队进一步的消息,antti 的漂亮解决方案也可以修改如下:
rate = 5.0; // unit: messages
per = 8.0; // unit: seconds
allowance = rate; // unit: messages
last_check = now(); // floating-point, e.g. usec accuracy. Unit: seconds
when (message_received):
current = now();
time_passed = current - last_check;
last_check = current;
allowance += time_passed * (rate / per);
if (allowance > rate):
allowance = rate; // throttle
if (allowance < 1.0):
time.sleep( (1-allowance) * (per/rate))
forward_message();
allowance = 0.0;
else:
forward_message();
allowance -= 1.0;
它只是等待,直到有足够的限额来发送消息。为了不以两倍的速率启动,限额也可以初始化为 0。
解决方案 5:
一种解决方案是将时间戳附加到每个队列项,并在 8 秒后丢弃该项目。每次将队列添加到队列时都可以执行此检查。
这仅当您将队列大小限制为 5 并且在队列已满时丢弃任何添加的内容时才有效。
解决方案 6:
保留最后五行发送的时间。保留排队的消息,直到第五条最新消息(如果存在)至少过去 8 秒(以 last_five 作为时间数组):
now = time.time()
if len(last_five) == 0 or (now - last_five[-1]) >= 8.0:
last_five.insert(0, now)
send_message(msg)
if len(last_five) > 5:
last_five.pop()
解决方案 7:
如果有人仍然感兴趣,我会将这个简单的可调用类与定时 LRU 键值存储结合使用,以限制每个 IP 的请求率。使用双端队列,但可以重写为与列表一起使用。
from collections import deque
import time
class RateLimiter:
def __init__(self, maxRate=5, timeUnit=1):
self.timeUnit = timeUnit
self.deque = deque(maxlen=maxRate)
def __call__(self):
if self.deque.maxlen == len(self.deque):
cTime = time.time()
if cTime - self.deque[0] > self.timeUnit:
self.deque.append(cTime)
return False
else:
return True
self.deque.append(time.time())
return False
r = RateLimiter()
for i in range(0,100):
time.sleep(0.1)
print(i, "block" if r() else "pass")
解决方案 8:
这只是来自已接受答案的代码的 python 实现。
import time
class Object(object):
pass
def get_throttler(rate, per):
scope = Object()
scope.allowance = rate
scope.last_check = time.time()
def throttler(fn):
current = time.time()
time_passed = current - scope.last_check;
scope.last_check = current;
scope.allowance = scope.allowance + time_passed * (rate / per)
if (scope.allowance > rate):
scope.allowance = rate
if (scope.allowance < 1):
pass
else:
fn()
scope.allowance = scope.allowance - 1
return throttler
解决方案 9:
这个怎么样:
long check_time = System.currentTimeMillis();
int msgs_sent_count = 0;
private boolean isRateLimited(int msgs_per_sec) {
if (System.currentTimeMillis() - check_time > 1000) {
check_time = System.currentTimeMillis();
msgs_sent_count = 0;
}
if (msgs_sent_count > (msgs_per_sec - 1)) {
return true;
} else {
msgs_sent_count++;
}
return false;
}
解决方案 10:
我需要 Scala 的一个变体。它如下:
case class Limiter[-A, +B](callsPerSecond: (Double, Double), f: A ⇒ B) extends (A ⇒ B) {
import Thread.sleep
private def now = System.currentTimeMillis / 1000.0
private val (calls, sec) = callsPerSecond
private var allowance = 1.0
private var last = now
def apply(a: A): B = {
synchronized {
val t = now
val delta_t = t - last
last = t
allowance += delta_t * (calls / sec)
if (allowance > calls)
allowance = calls
if (allowance < 1d) {
sleep(((1 - allowance) * (sec / calls) * 1000d).toLong)
}
allowance -= 1
}
f(a)
}
}
使用方法如下:
val f = Limiter((5d, 8d), {
_: Unit ⇒
println(System.currentTimeMillis)
})
while(true){f(())}
解决方案 11:
另一个解决方案
from collections import deque
from datetime import timedelta
from time import sleep
class RateLimiter:
def __init__(self, items: int, per: timedelta = timedelta(seconds=1)):
self.items = items
self.per = per
self.deque = deque(maxlen=items)
def count(self):
now = datetime.now()
self.deque.append(now)
def time_to_wait(self) -> timedelta:
if len(self.deque) < self.deque.maxlen:
return timedelta(0)
now = datetime.now()
per = now - self.deque[0]
return max(timedelta(0), self.per - per)
def throttle(self):
sleep(self.time_to_wait().total_seconds())
self.count()
if __name__ == '__main__':
rate_limiter = RateLimiter(items=3, per=timedelta(seconds=3))
for i in range(10):
rate_limiter.throttle()
print(f'{i}')
解决方案 12:
java 语法,基本思想:不计算迭代次数,计算跳跃时间。记住上次跳跃时间,等待所需时间不超过跳跃速率
public static void limitRate(int rate, AtomicLong leapTime, ReentrantLock rateLock) {
long targetLeapTime = 1_000_000_000 / rate;
rateLock.lock();
try {
long timeSnapshot = nanoTime();
long waitTime = targetLeapTime - (timeSnapshot - leapTime.get());
if (waitTime > 0) {
LockSupport.parkNanos(waitTime);
leapTime.set(timeSnapshot + waitTime);
} else {
leapTime.set(timeSnapshot);
}
} finally {
rateLock.unlock();
}
}
扫码咨询,免费领取项目管理大礼包!