
Go Concurrency Patterns: Worker Pools and Rate Limiting
Worker Pool Implementation
A worker pool is a collection of goroutines that process tasks from a shared work queue. This pattern is ideal for scenarios where you need to process a large number of independent tasks concurrently without creating an unlimited number of goroutines.
package main
import (
"fmt"
"sync"
"time"
)
type Job struct {
ID int
Data string
}
type WorkerPool struct {
jobs chan Job
results chan string
wg sync.WaitGroup
}
func NewWorkerPool(numWorkers int) *WorkerPool {
pool := &WorkerPool{
jobs: make(chan Job, 100),
results: make(chan string, 100),
}
// Start workers
for i := 0; i < numWorkers; i++ {
pool.wg.Add(1)
go pool.worker(i)
}
return pool
}
func (wp *WorkerPool) worker(id int) {
defer wp.wg.Done()
for job := range wp.jobs {
// Simulate work
time.Sleep(100 * time.Millisecond)
result := fmt.Sprintf("Worker %d processed job %d: %s", id, job.ID, job.Data)
wp.results <- result
}
}
func (wp *WorkerPool) Submit(job Job) {
wp.jobs <- job
}
func (wp *WorkerPool) Close() {
close(wp.jobs)
wp.wg.Wait()
close(wp.results)
}
func main() {
pool := NewWorkerPool(3)
// Submit jobs
for i := 0; i < 10; i++ {
pool.Submit(Job{ID: i, Data: fmt.Sprintf("Data-%d", i)})
}
// Close pool and collect results
pool.Close()
// Print results
for result := range pool.results {
fmt.Println(result)
}
}Rate Limiting Implementation
Rate limiting controls the frequency of operations to prevent system overload. The token bucket algorithm is commonly used for this purpose, allowing bursts of requests followed by controlled pacing.
package main
import (
"fmt"
"sync"
"time"
)
type RateLimiter struct {
tokens int64
maxTokens int64
refillRate int64 // tokens per second
mu sync.Mutex
lastRefill time.Time
}
func NewRateLimiter(maxTokens, refillRate int64) *RateLimiter {
return &RateLimiter{
tokens: maxTokens,
maxTokens: maxTokens,
refillRate: refillRate,
lastRefill: time.Now(),
}
}
func (rl *RateLimiter) Allow() bool {
rl.mu.Lock()
defer rl.mu.Unlock()
// Refill tokens based on elapsed time
now := time.Now()
elapsed := now.Sub(rl.lastRefill).Seconds()
tokensToAdd := int64(elapsed * float64(rl.refillRate))
if tokensToAdd > 0 {
rl.tokens = min(rl.tokens+tokensToAdd, rl.maxTokens)
rl.lastRefill = now
}
// Check if we can allow the request
if rl.tokens > 0 {
rl.tokens--
return true
}
return false
}
func min(a, b int64) int64 {
if a < b {
return a
}
return b
}
func main() {
// Create rate limiter: 5 tokens per second, max 10 tokens
limiter := NewRateLimiter(10, 5)
// Simulate requests
for i := 0; i < 15; i++ {
if limiter.Allow() {
fmt.Printf("Request %d: Allowed\n", i+1)
} else {
fmt.Printf("Request %d: Rate limited\n", i+1)
}
time.Sleep(200 * time.Millisecond) // Simulate request processing
}
}Advanced Worker Pool with Context Support
Enhancing the worker pool with context support allows for better cancellation and timeout handling, making it suitable for production environments.
package main
import (
"context"
"fmt"
"sync"
"time"
)
type Task struct {
ID int
Payload string
Timeout time.Duration
}
type AdvancedWorkerPool struct {
tasks chan Task
results chan Result
wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
}
type Result struct {
TaskID int
Success bool
Message string
Duration time.Duration
}
func NewAdvancedWorkerPool(numWorkers int) *AdvancedWorkerPool {
ctx, cancel := context.WithCancel(context.Background())
pool := &AdvancedWorkerPool{
tasks: make(chan Task, 100),
results: make(chan Result, 100),
ctx: ctx,
cancel: cancel,
}
// Start workers
for i := 0; i < numWorkers; i++ {
pool.wg.Add(1)
go pool.worker(i)
}
return pool
}
func (awp *AdvancedWorkerPool) worker(id int) {
defer awp.wg.Done()
for {
select {
case <-awp.ctx.Done():
return
case task, ok := <-awp.tasks:
if !ok {
return
}
start := time.Now()
// Process task with timeout
ctx, cancel := context.WithTimeout(awp.ctx, task.Timeout)
defer cancel()
success := awp.processTask(ctx, task)
duration := time.Since(start)
awp.results <- Result{
TaskID: task.ID,
Success: success,
Message: fmt.Sprintf("Worker %d completed task", id),
Duration: duration,
}
}
}
}
func (awp *AdvancedWorkerPool) processTask(ctx context.Context, task Task) bool {
// Simulate work that respects context cancellation
select {
case <-time.After(500 * time.Millisecond):
return true
case <-ctx.Done():
return false
}
}
func (awp *AdvancedWorkerPool) Submit(task Task) {
select {
case awp.tasks <- task:
default:
// Handle queue full scenario
awp.results <- Result{
TaskID: task.ID,
Success: false,
Message: "Queue full",
Duration: 0,
}
}
}
func (awp *AdvancedWorkerPool) Close() {
close(awp.tasks)
awp.cancel()
awp.wg.Wait()
close(awp.results)
}
func main() {
pool := NewAdvancedWorkerPool(3)
// Submit tasks
tasks := []Task{
{ID: 1, Payload: "task1", Timeout: 1 * time.Second},
{ID: 2, Payload: "task2", Timeout: 1 * time.Second},
{ID: 3, Payload: "task3", Timeout: 1 * time.Second},
}
for _, task := range tasks {
pool.Submit(task)
}
// Collect results
go func() {
pool.Close()
}()
for result := range pool.results {
fmt.Printf("Task %d: %s (Duration: %v)\n",
result.TaskID, result.Message, result.Duration)
}
}Performance Comparison
| Pattern | Concurrent Workers | Resource Usage | Throughput | Complexity |
|---|---|---|---|---|
| Basic Worker Pool | Fixed number | Low | High | Low |
| Rate Limited Pool | Variable | Medium | Controlled | Medium |
| Advanced Pool | Configurable | High | Optimized | High |
The basic worker pool is ideal for simple scenarios where you want to process tasks in parallel. Rate limiting adds control over resource consumption, making it suitable for external API calls or database operations. The advanced pool provides full control with context support, making it production-ready for complex applications.
Best Practices
- Use buffered channels to prevent goroutine blocking
- Implement proper cleanup with context cancellation
- Monitor resource usage to adjust worker count dynamically
- Handle errors gracefully in worker functions
- Set appropriate timeouts to prevent hanging operations
