什么是好的速率限制算法?

2025-02-17 09:25:00
admin
原创
62
摘要:问题描述:我可以使用一些伪代码,或者更好的 Python。我正在尝试为 Python IRC 机器人实现速率限制队列,并且它部分有效,但如果有人触发的消息少于限制(例如,速率限制为每 8 秒 5 条消息,而该人仅触发 4 条),并且下一次触发超过 8 秒(例如,16 秒后),机器人会发送消息,但队列已满,机器...

问题描述:

我可以使用一些伪代码,或者更好的 Python。我正在尝试为 Python IRC 机器人实现速率限制队列,并且它部分有效,但如果有人触发的消息少于限制(例如,速率限制为每 8 秒 5 条消息,而该人仅触发 4 条),并且下一次触发超过 8 秒(例如,16 秒后),机器人会发送消息,但队列已满,机器人会等待 8 秒,即使由于 8 秒期限已过,因此不需要等待。


解决方案 1:

这里是最简单的算法,如果您只想在消息到达太快时丢弃它们(而不是将它们排队,这是有意义的,因为队列可能会变得任意大):

rate = 5.0; // unit: messages
per  = 8.0; // unit: seconds
allowance = rate; // unit: messages
last_check = now(); // floating-point, e.g. usec accuracy. Unit: seconds

when (message_received):
  current = now();
  time_passed = current - last_check;
  last_check = current;
  allowance += time_passed * (rate / per);
  if (allowance > rate):
    allowance = rate; // throttle
  if (allowance < 1.0):
    discard_message();
  else:
    forward_message();
    allowance -= 1.0;

此解决方案中没有数据结构、计时器等,并且运行干净 :) 可以看到,“限额”以每秒最多 5/8 个单位的速度增长,即每八秒最多五个单位。每转发一条消息都会扣除一个单位,因此每八秒您不能发送超过五条消息。

请注意,rate应为整数,即没有非零小数部分,否则算法将无法正常工作(实际利率将不会是rate/per)。例如,由于永远不会增长到 1.0,所以rate=0.5; per=1.0;不起作用。但工作正常。allowance`rate=1.0; per=2.0;`

解决方案 2:

在入队函数之前使用此装饰器@RateLimited(ratepersec)。

基本上,这会检查自上次以来是否已过去 1/rate 秒,如果没有,则等待剩余时间,否则不等待。这实际上限制了您的速率/秒。装饰器可以应用于您想要限制速率的任何函数。

在您的情况下,如果您希望每 8 秒最多发送 5 条消息,请在 sendToQueue 函数之前使用 @RateLimited(0.625)。

import time

def RateLimited(maxPerSecond):
    minInterval = 1.0 / float(maxPerSecond)
    def decorate(func):
        lastTimeCalled = [0.0]
        def rateLimitedFunction(*args,**kargs):
            elapsed = time.clock() - lastTimeCalled[0]
            leftToWait = minInterval - elapsed
            if leftToWait>0:
                time.sleep(leftToWait)
            ret = func(*args,**kargs)
            lastTimeCalled[0] = time.clock()
            return ret
        return rateLimitedFunction
    return decorate

@RateLimited(2)  # 2 per second at most
def PrintNumber(num):
    print num

if __name__ == "__main__":
    print "This should print 1,2,3... at about 2 per second."
    for i in range(1,100):
        PrintNumber(i)

解决方案 3:

令牌桶的实现相当简单。

从一个装有 5 个代币的桶开始。

每 5/8 秒:如果存储桶中的令牌少于 5 个,则添加一个。

每次要发送消息时:如果存储桶中有≥1个令牌,则取出一个令牌并发送消息。否则,等待/丢弃消息/等等。

(显然,在实际代码中,您将使用整数计数器而不是真实标记,并且可以通过存储时间戳来优化每 5/8 秒的步骤)


再次阅读问题,如果每 8 秒完全重置速率限制,那么这里有一个修改:

last_send从很久以前的时间戳开始(例如,纪元)。此外,从相同的 5 令牌桶开始。

打破每 5/8 秒规则。

每次发送消息时:首先,检查是否last_send≥8秒前。如果是,则填充桶(将其设置为5个令牌)。其次,如果桶中有令牌,则发送消息(否则,丢弃/等待/等)。第三,设置last_send为现在。

这应该适用于那种情况。


我实际上已经使用类似这样的策略(第一种方法)编写了一个 IRC 机器人。它是用 Perl 而不是 Python 编写的,但这里有一些代码可以说明:

这里的第一部分处理将令牌添加到存储桶中。您可以看到基于时间添加令牌的优化(倒数第二行),然后最后一行将存储桶内容限制为最大值(MESSAGE_BURST)

    my $start_time = time;
    ...
    # Bucket handling
    my $bucket = $conn->{fujiko_limit_bucket};
    my $lasttx = $conn->{fujiko_limit_lasttx};
    $bucket += ($start_time-$lasttx)/MESSAGE_INTERVAL;
    ($bucket <= MESSAGE_BURST) or $bucket = MESSAGE_BURST;

$conn 是一个传递的数据结构。它位于一个例行运行的方法中(它计算下一次什么时候有事要做,然后休眠一段时间或直到收到网络流量)。该方法的下一部分处理发送。它相当复杂,因为消息具有与之关联的优先级。

    # Queue handling. Start with the ultimate queue.
    my $queues = $conn->{fujiko_queues};
    foreach my $entry (@{$queues->[PRIORITY_ULTIMATE]}) {
            # Ultimate is special. We run ultimate no matter what. Even if
            # it sends the bucket negative.
            --$bucket;
            $entry->{code}(@{$entry->{args}});
    }
    $queues->[PRIORITY_ULTIMATE] = [];

这是第一个队列,无论如何都会运行。即使我们的连接因泛洪而中断。用于极其重要的事情,例如响应服务器的 PING。接下来是其余的队列:

    # Continue to the other queues, in order of priority.
    QRUN: for (my $pri = PRIORITY_HIGH; $pri >= PRIORITY_JUNK; --$pri) {
            my $queue = $queues->[$pri];
            while (scalar(@$queue)) {
                    if ($bucket < 1) {
                            # continue later.
                            $need_more_time = 1;
                            last QRUN;
                    } else {
                            --$bucket;
                            my $entry = shift @$queue;
                            $entry->{code}(@{$entry->{args}});
                    }
            }
    }

最后,bucket 状态被保存回 $conn 数据结构(实际上在方法中稍后;它首先计算多久会有更多工作)

    # Save status.
    $conn->{fujiko_limit_bucket} = $bucket;
    $conn->{fujiko_limit_lasttx} = $start_time;

如您所见,实际的存储桶处理代码非常小 — 大约四行。其余代码是优先级队列处理。该机器人具有优先级队列,因此,例如,与其聊天的人无法阻止它执行其重要的踢出/禁止任务。

解决方案 4:

为了阻止处理直到消息可以发送,从而排队进一步的消息,antti 的漂亮解决方案也可以修改如下:

rate = 5.0; // unit: messages
per  = 8.0; // unit: seconds
allowance = rate; // unit: messages
last_check = now(); // floating-point, e.g. usec accuracy. Unit: seconds

when (message_received):
  current = now();
  time_passed = current - last_check;
  last_check = current;
  allowance += time_passed * (rate / per);
  if (allowance > rate):
    allowance = rate; // throttle
  if (allowance < 1.0):
    time.sleep( (1-allowance) * (per/rate))
    forward_message();
    allowance = 0.0;
  else:
    forward_message();
    allowance -= 1.0;

它只是等待,直到有足够的限额来发送消息。为了不以两倍的速率启动,限额也可以初始化为 0。

解决方案 5:

一种解决方案是将时间戳附加到每个队列项,并在 8 秒后丢弃该项目。每次将队列添加到队列时都可以执行此检查。

这仅当您将队列大小限制为 5 并且在队列已满时丢弃任何添加的内容时才有效。

解决方案 6:

保留最后五行发送的时间。保留排队的消息,直到第五条最新消息(如果存在)至少过去 8 秒(以 last_five 作为时间数组):

now = time.time()
if len(last_five) == 0 or (now - last_five[-1]) >= 8.0:
    last_five.insert(0, now)
    send_message(msg)
if len(last_five) > 5:
    last_five.pop()

解决方案 7:

如果有人仍然感兴趣,我会将这个简单的可调用类与定时 LRU 键值存储结合使用,以限制每个 IP 的请求率。使用双端队列,但可以重写为与列表一起使用。

from collections import deque
import time


class RateLimiter:
    def __init__(self, maxRate=5, timeUnit=1):
        self.timeUnit = timeUnit
        self.deque = deque(maxlen=maxRate)

    def __call__(self):
        if self.deque.maxlen == len(self.deque):
            cTime = time.time()
            if cTime - self.deque[0] > self.timeUnit:
                self.deque.append(cTime)
                return False
            else:
                return True
        self.deque.append(time.time())
        return False

r = RateLimiter()
for i in range(0,100):
    time.sleep(0.1)
    print(i, "block" if r() else "pass")

解决方案 8:

这只是来自已接受答案的代码的 python 实现。

import time

class Object(object):
    pass

def get_throttler(rate, per):
    scope = Object()
    scope.allowance = rate
    scope.last_check = time.time()
    def throttler(fn):
        current = time.time()
        time_passed = current - scope.last_check;
        scope.last_check = current;
        scope.allowance = scope.allowance + time_passed * (rate / per)
        if (scope.allowance > rate):
          scope.allowance = rate
        if (scope.allowance < 1):
          pass
        else:
          fn()
          scope.allowance = scope.allowance - 1
    return throttler

解决方案 9:

这个怎么样:

long check_time = System.currentTimeMillis();
int msgs_sent_count = 0;

private boolean isRateLimited(int msgs_per_sec) {
    if (System.currentTimeMillis() - check_time > 1000) {
        check_time = System.currentTimeMillis();
        msgs_sent_count = 0;
    }

    if (msgs_sent_count > (msgs_per_sec - 1)) {
        return true;
    } else {
        msgs_sent_count++;
    }

    return false;
}

解决方案 10:

我需要 Scala 的一个变体。它如下:

case class Limiter[-A, +B](callsPerSecond: (Double, Double), f: A ⇒ B) extends (A ⇒ B) {

  import Thread.sleep
  private def now = System.currentTimeMillis / 1000.0
  private val (calls, sec) = callsPerSecond
  private var allowance  = 1.0
  private var last = now

  def apply(a: A): B = {
    synchronized {
      val t = now
      val delta_t = t - last
      last = t
      allowance += delta_t * (calls / sec)
      if (allowance > calls)
        allowance = calls
      if (allowance < 1d) {
        sleep(((1 - allowance) * (sec / calls) * 1000d).toLong)
      }
      allowance -= 1
    }
    f(a)
  }

}

使用方法如下:

val f = Limiter((5d, 8d), { 
  _: Unit ⇒ 
    println(System.currentTimeMillis) 
})
while(true){f(())}

解决方案 11:

另一个解决方案

from collections import deque
from datetime import timedelta
from time import sleep

class RateLimiter:
    def __init__(self, items: int, per: timedelta = timedelta(seconds=1)):
        self.items = items
        self.per = per
        self.deque = deque(maxlen=items)

    def count(self):
        now = datetime.now()
        self.deque.append(now)

    def time_to_wait(self) -> timedelta:
        if len(self.deque) < self.deque.maxlen:
            return timedelta(0)
        now = datetime.now()
        per = now - self.deque[0]
        return max(timedelta(0), self.per - per)

    def throttle(self):
        sleep(self.time_to_wait().total_seconds())
        self.count()

if __name__ == '__main__':
    rate_limiter = RateLimiter(items=3, per=timedelta(seconds=3))

    for i in range(10):
        rate_limiter.throttle()
        print(f'{i}')

解决方案 12:

java 语法,基本思想:不计算迭代次数,计算跳跃时间。记住上次跳跃时间,等待所需时间不超过跳跃速率

public static void limitRate(int rate, AtomicLong leapTime, ReentrantLock rateLock) {
    long targetLeapTime = 1_000_000_000 / rate;
    rateLock.lock();
    try {
        long timeSnapshot = nanoTime();
        long waitTime = targetLeapTime - (timeSnapshot - leapTime.get());
        if (waitTime > 0) {

            LockSupport.parkNanos(waitTime);

            leapTime.set(timeSnapshot + waitTime);
        } else {
            leapTime.set(timeSnapshot);
        }
    } finally {
        rateLock.unlock();
    }
}
相关推荐
  政府信创国产化的10大政策解读一、信创国产化的背景与意义信创国产化,即信息技术应用创新国产化,是当前中国信息技术领域的一个重要发展方向。其核心在于通过自主研发和创新,实现信息技术应用的自主可控,减少对外部技术的依赖,并规避潜在的技术制裁和风险。随着全球信息技术竞争的加剧,以及某些国家对中国在科技领域的打压,信创国产化显...
工程项目管理   2545  
  为什么项目管理通常仍然耗时且低效?您是否还在反复更新电子表格、淹没在便利贴中并参加每周更新会议?这确实是耗费时间和精力。借助软件工具的帮助,您可以一目了然地全面了解您的项目。如今,国内外有足够多优秀的项目管理软件可以帮助您掌控每个项目。什么是项目管理软件?项目管理软件是广泛行业用于项目规划、资源分配和调度的软件。它使项...
项目管理软件   1551  
  建筑工程全生命周期涉及从项目规划、设计、施工到运营维护等多个复杂阶段,每个阶段都产生和依赖大量信息。PLM(产品生命周期管理)系统作为一种整合数据、流程和人员的数字化解决方案,正逐渐成为建筑行业实现高效协同与可持续发展的关键支撑。通过数字化转型,PLM系统能够优化各阶段的工作流程,提升项目整体质量和效率,为建筑工程的全...
plm是什么软件   1  
  产品生命周期管理(PLM)系统在企业资源成本率优化方面发挥着至关重要的作用。通过构建有效的数据模型,PLM系统能够整合企业各个环节的数据,为资源成本的精准分析和优化提供有力支持。这不仅有助于企业降低成本,还能提升产品质量和市场竞争力。PLM系统概述PLM系统是一种用于管理产品从概念设计到退役全生命周期过程中所有信息和流...
PLM项目管理软件   1  
  产品生命周期管理(PLM)系统在现代企业的产品研发、生产与管理过程中扮演着至关重要的角色。它整合了从产品概念设计到产品退役的全生命周期数据与流程,助力企业提升效率、降低成本并增强创新能力。随着技术的不断发展,到 2025 年,PLM 系统将具备一系列核心功能模块,这些模块将深度影响企业的运营与发展。产品数据管理模块产品...
plm是什么意思   0  
热门文章
项目管理软件有哪些?
曾咪二维码

扫码咨询,免费领取项目管理大礼包!

云禅道AD
禅道项目管理软件

云端的项目管理软件

尊享禅道项目软件收费版功能

无需维护,随时随地协同办公

内置subversion和git源码管理

每天备份,随时转为私有部署

免费试用