Leaking Bucket Algorithm
Similar to a token bucket except that requests are processed at a fixed rate. It is usually implemented with a queue (FIFO).
When a request arrives, the system checks if the queue is full. If it is not full, the request is added to the queue. Otherwise, the request is dropped. Requests are pulled from the queue and processed at regular intervals.
Leaking Bucket algorithm takes two parameters; Bucket size (usually equal to the queue size) and Outflow rate (how many requests can be processed per second).
package main
import (
"sync"
"time"
)
type LeakyBucket struct {
capacity int64
remaining int64
leakRate time.Duration
lastLeak time.Time
mu sync.Mutex
}
func NewLeakyBucket(capacity int64, leakRate time.Duration) *LeakyBucket {
return &LeakyBucket{
capacity: capacity,
remaining: capacity,
leakRate: leakRate,
lastLeak: time.Now(),
}
}
func (b *LeakyBucket) TryTake(n int64) bool {
b.mu.Lock()
defer b.mu.Unlock()
// Calculate the time since the last leak
now := time.Now()
leaked := int64(now.Sub(b.lastLeak) / b.leakRate)
// Update the bucket's current state
if leaked > 0 {
if leaked >= b.remaining {
b.remaining = b.capacity
} else {
b.remaining += leaked
}
b.lastLeak = now
}
// Try to take n from the bucket
if n > b.remaining {
return false
}
b.remaining -= n
return true
}
func main() {
bucket := NewLeakyBucket(10, time.Second)
for {
if bucket.TryTake(1) {
println("Took 1 from the bucket")
} else {
println("Bucket is empty, waiting...")
}
time.Sleep(100 * time.Millisecond)
}
}