Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

🏠 Back to Blog

Leaking Bucket Algorithm

Similar to a token bucket except that requests are processed at a fixed rate. It is usually implemented with a queue (FIFO).

When a request arrives, the system checks if the queue is full. If it is not full, the request is added to the queue. Otherwise, the request is dropped. Requests are pulled from the queue and processed at regular intervals.

Leaking Bucket algorithm takes two parameters; Bucket size (usually equal to the queue size) and Outflow rate (how many requests can be processed per second).

package main

import (
	"sync"
	"time"
)

type LeakyBucket struct {
	capacity    int64
	remaining   int64
	leakRate    time.Duration
	lastLeak    time.Time
	mu          sync.Mutex
}

func NewLeakyBucket(capacity int64, leakRate time.Duration) *LeakyBucket {
	return &LeakyBucket{
		capacity:  capacity,
		remaining: capacity,
		leakRate:  leakRate,
		lastLeak:  time.Now(),
	}
}

func (b *LeakyBucket) TryTake(n int64) bool {
	b.mu.Lock()
	defer b.mu.Unlock()

	// Calculate the time since the last leak
	now := time.Now()
	leaked := int64(now.Sub(b.lastLeak) / b.leakRate)

	// Update the bucket's current state
	if leaked > 0 {
		if leaked >= b.remaining {
			b.remaining = b.capacity
		} else {
			b.remaining += leaked
		}
		b.lastLeak = now
	}

	// Try to take n from the bucket
	if n > b.remaining {
		return false
	}
	b.remaining -= n
	return true
}

func main() {
	bucket := NewLeakyBucket(10, time.Second)
	for {
		if bucket.TryTake(1) {
			println("Took 1 from the bucket")
		} else {
			println("Bucket is empty, waiting...")
		}
		time.Sleep(100 * time.Millisecond)
	}
}