I could use some pseudo-code, or better, Python. I am trying to implement a rate-limiting queue for a Python IRC bot, and it partially works, but if someone triggers less messages than the limit (e.g., rate limit is 5 messages per 8 seconds, and the person triggers only 4), and the next trigger is over the 8 seconds (e.g., 16 seconds later), the bot sends the message, but the queue becomes full and the bot waits 8 seconds, even though it's not needed since the 8 second period has lapsed.
A Token Bucket is fairly simple to implement.
Start with a bucket with 5 tokens.
Every 5/8 seconds: If the bucket has less than 5 tokens, add one.
Each time you want to send a message: If the bucket has ≥1 token, take one token out and send the message. Otherwise, wait/drop the message/whatever.
(obviously, in actual code, you'd use an integer counter instead of real tokens and you can optimize out the every 5/8s step by storing timestamps)
Reading the question again, if the rate limit is fully reset each 8 seconds, then here is a modification:
Start with a timestamp, last_send
, at a time long ago (e.g., at the epoch). Also, start with the same 5-token bucket.
Strike the every 5/8 seconds rule.
Each time you send a message: First, check if last_send
≥ 8 seconds ago. If so, fill the bucket (set it to 5 tokens). Second, if there are tokens in the bucket, send the message (otherwise, drop/wait/etc.). Third, set last_send
to now.
That should work for that scenario.
I've actually written an IRC bot using a strategy like this (the first approach). Its in Perl, not Python, but here is some code to illustrate:
The first part here handles adding tokens to the bucket. You can see the optimization of adding tokens based on time (2nd to last line) and then the last line clamps bucket contents to the maximum (MESSAGE_BURST)
my $start_time = time;
...
# Bucket handling
my $bucket = $conn->{fujiko_limit_bucket};
my $lasttx = $conn->{fujiko_limit_lasttx};
$bucket += ($start_time-$lasttx)/MESSAGE_INTERVAL;
($bucket <= MESSAGE_BURST) or $bucket = MESSAGE_BURST;
$conn is a data structure which is passed around. This is inside a method that runs routinely (it calculates when the next time it'll have something to do, and sleeps either that long or until it gets network traffic). The next part of the method handles sending. It is rather complicated, because messages have priorities associated with them.
# Queue handling. Start with the ultimate queue.
my $queues = $conn->{fujiko_queues};
foreach my $entry (@{$queues->[PRIORITY_ULTIMATE]}) {
# Ultimate is special. We run ultimate no matter what. Even if
# it sends the bucket negative.
--$bucket;
$entry->{code}(@{$entry->{args}});
}
$queues->[PRIORITY_ULTIMATE] = [];
That's the first queue, which is run no matter what. Even if it gets our connection killed for flooding. Used for extremely important thinks, like responding to the server's PING. Next, the rest of the queues:
# Continue to the other queues, in order of priority.
QRUN: for (my $pri = PRIORITY_HIGH; $pri >= PRIORITY_JUNK; --$pri) {
my $queue = $queues->[$pri];
while (scalar(@$queue)) {
if ($bucket < 1) {
# continue later.
$need_more_time = 1;
last QRUN;
} else {
--$bucket;
my $entry = shift @$queue;
$entry->{code}(@{$entry->{args}});
}
}
}
Finally, the bucket status is saved back to the $conn data structure (actually a bit later in the method; it first calculates how soon it'll have more work)
# Save status.
$conn->{fujiko_limit_bucket} = $bucket;
$conn->{fujiko_limit_lasttx} = $start_time;
As you can see, the actual bucket handling code is very small — about four lines. The rest of the code is priority queue handling. The bot has priority queues so that e.g., someone chatting with it can't prevent it from doing its important kick/ban duties.
One solution is to attach a timestamp to each queue item and to discard the item after 8 seconds have passed. You can perform this check each time the queue is added to.
This only works if you limit the queue size to 5 and discard any additions whilst the queue is full.
Keep the time that the last five lines were sent. Hold the queued messages until the time the fifth-most-recent message (if it exists) is a least 8 seconds in the past (with last_five as an array of times):
now = time.time()
if len(last_five) == 0 or (now - last_five[-1]) >= 8.0:
last_five.insert(0, now)
send_message(msg)
if len(last_five) > 5:
last_five.pop()
Use this decorator @RateLimited(ratepersec) before your function that enqueues.
Basically, this checks if 1/rate secs have passed since the last time and if not, waits the remainder of the time, otherwise it doesn't wait. This effectively limits you to rate/sec. The decorator can be applied to any function you want rate-limited.
In your case, if you want a maximum of 5 messages per 8 seconds, use @RateLimited(0.625) before your sendToQueue function.
import time
def RateLimited(maxPerSecond):
minInterval = 1.0 / float(maxPerSecond)
def decorate(func):
lastTimeCalled = [0.0]
def rateLimitedFunction(*args,**kargs):
elapsed = time.clock() - lastTimeCalled[0]
leftToWait = minInterval - elapsed
if leftToWait>0:
time.sleep(leftToWait)
ret = func(*args,**kargs)
lastTimeCalled[0] = time.clock()
return ret
return rateLimitedFunction
return decorate
@RateLimited(2) # 2 per second at most
def PrintNumber(num):
print num
if __name__ == "__main__":
print "This should print 1,2,3... at about 2 per second."
for i in range(1,100):
PrintNumber(i)
Here the simplest algorithm, if you want just to drop messages when they arrive too quickly (instead of queuing them, which makes sense because the queue might get arbitrarily large):
rate = 5.0; // unit: messages
per = 8.0; // unit: seconds
allowance = rate; // unit: messages
last_check = now(); // floating-point, e.g. usec accuracy. Unit: seconds
when (message_received):
current = now();
time_passed = current - last_check;
last_check = current;
allowance += time_passed * (rate / per);
if (allowance > rate):
allowance = rate; // throttle
if (allowance < 1.0):
discard_message();
else:
forward_message();
allowance -= 1.0;
There are no datastructures, timers etc. in this solution and it works cleanly :) To see this, 'allowance' grows at speed 5/8 units per seconds at most, i.e. at most five units per eight seconds. Every message that is forwarded deducts one unit, so you can't send more than five messages per every eight seconds.
How about this:
long check_time = System.currentTimeMillis();
int msgs_sent_count = 0;
private boolean isRateLimited(int msgs_per_sec) {
if (System.currentTimeMillis() - check_time > 1000) {
check_time = System.currentTimeMillis();
msgs_sent_count = 0;
}
if (msgs_sent_count > (msgs_per_sec - 1)) {
return true;
} else {
msgs_sent_count++;
}
return false;
}