Worker Pool Implementation

A worker pool is a collection of goroutines that process tasks from a shared work queue. This pattern is ideal for scenarios where you need to process a large number of independent tasks concurrently without creating an unlimited number of goroutines.

package main

import (
    "fmt"
    "sync"
    "time"
)

type Job struct {
    ID   int
    Data string
}

type WorkerPool struct {
    jobs    chan Job
    results chan string
    wg      sync.WaitGroup
}

func NewWorkerPool(numWorkers int) *WorkerPool {
    pool := &WorkerPool{
        jobs:    make(chan Job, 100),
        results: make(chan string, 100),
    }
    
    // Start workers
    for i := 0; i < numWorkers; i++ {
        pool.wg.Add(1)
        go pool.worker(i)
    }
    
    return pool
}

func (wp *WorkerPool) worker(id int) {
    defer wp.wg.Done()
    for job := range wp.jobs {
        // Simulate work
        time.Sleep(100 * time.Millisecond)
        result := fmt.Sprintf("Worker %d processed job %d: %s", id, job.ID, job.Data)
        wp.results <- result
    }
}

func (wp *WorkerPool) Submit(job Job) {
    wp.jobs <- job
}

func (wp *WorkerPool) Close() {
    close(wp.jobs)
    wp.wg.Wait()
    close(wp.results)
}

func main() {
    pool := NewWorkerPool(3)
    
    // Submit jobs
    for i := 0; i < 10; i++ {
        pool.Submit(Job{ID: i, Data: fmt.Sprintf("Data-%d", i)})
    }
    
    // Close pool and collect results
    pool.Close()
    
    // Print results
    for result := range pool.results {
        fmt.Println(result)
    }
}

Rate Limiting Implementation

Rate limiting controls the frequency of operations to prevent system overload. The token bucket algorithm is commonly used for this purpose, allowing bursts of requests followed by controlled pacing.

package main

import (
    "fmt"
    "sync"
    "time"
)

type RateLimiter struct {
    tokens     int64
    maxTokens  int64
    refillRate int64 // tokens per second
    mu         sync.Mutex
    lastRefill time.Time
}

func NewRateLimiter(maxTokens, refillRate int64) *RateLimiter {
    return &RateLimiter{
        tokens:     maxTokens,
        maxTokens:  maxTokens,
        refillRate: refillRate,
        lastRefill: time.Now(),
    }
}

func (rl *RateLimiter) Allow() bool {
    rl.mu.Lock()
    defer rl.mu.Unlock()
    
    // Refill tokens based on elapsed time
    now := time.Now()
    elapsed := now.Sub(rl.lastRefill).Seconds()
    tokensToAdd := int64(elapsed * float64(rl.refillRate))
    
    if tokensToAdd > 0 {
        rl.tokens = min(rl.tokens+tokensToAdd, rl.maxTokens)
        rl.lastRefill = now
    }
    
    // Check if we can allow the request
    if rl.tokens > 0 {
        rl.tokens--
        return true
    }
    
    return false
}

func min(a, b int64) int64 {
    if a < b {
        return a
    }
    return b
}

func main() {
    // Create rate limiter: 5 tokens per second, max 10 tokens
    limiter := NewRateLimiter(10, 5)
    
    // Simulate requests
    for i := 0; i < 15; i++ {
        if limiter.Allow() {
            fmt.Printf("Request %d: Allowed\n", i+1)
        } else {
            fmt.Printf("Request %d: Rate limited\n", i+1)
        }
        time.Sleep(200 * time.Millisecond) // Simulate request processing
    }
}

Advanced Worker Pool with Context Support

Enhancing the worker pool with context support allows for better cancellation and timeout handling, making it suitable for production environments.

package main

import (
    "context"
    "fmt"
    "sync"
    "time"
)

type Task struct {
    ID      int
    Payload string
    Timeout time.Duration
}

type AdvancedWorkerPool struct {
    tasks   chan Task
    results chan Result
    wg      sync.WaitGroup
    ctx     context.Context
    cancel  context.CancelFunc
}

type Result struct {
    TaskID   int
    Success  bool
    Message  string
    Duration time.Duration
}

func NewAdvancedWorkerPool(numWorkers int) *AdvancedWorkerPool {
    ctx, cancel := context.WithCancel(context.Background())
    pool := &AdvancedWorkerPool{
        tasks:   make(chan Task, 100),
        results: make(chan Result, 100),
        ctx:     ctx,
        cancel:  cancel,
    }
    
    // Start workers
    for i := 0; i < numWorkers; i++ {
        pool.wg.Add(1)
        go pool.worker(i)
    }
    
    return pool
}

func (awp *AdvancedWorkerPool) worker(id int) {
    defer awp.wg.Done()
    for {
        select {
        case <-awp.ctx.Done():
            return
        case task, ok := <-awp.tasks:
            if !ok {
                return
            }
            start := time.Now()
            
            // Process task with timeout
            ctx, cancel := context.WithTimeout(awp.ctx, task.Timeout)
            defer cancel()
            
            success := awp.processTask(ctx, task)
            duration := time.Since(start)
            
            awp.results <- Result{
                TaskID:   task.ID,
                Success:  success,
                Message:  fmt.Sprintf("Worker %d completed task", id),
                Duration: duration,
            }
        }
    }
}

func (awp *AdvancedWorkerPool) processTask(ctx context.Context, task Task) bool {
    // Simulate work that respects context cancellation
    select {
    case <-time.After(500 * time.Millisecond):
        return true
    case <-ctx.Done():
        return false
    }
}

func (awp *AdvancedWorkerPool) Submit(task Task) {
    select {
    case awp.tasks <- task:
    default:
        // Handle queue full scenario
        awp.results <- Result{
            TaskID:   task.ID,
            Success:  false,
            Message:  "Queue full",
            Duration: 0,
        }
    }
}

func (awp *AdvancedWorkerPool) Close() {
    close(awp.tasks)
    awp.cancel()
    awp.wg.Wait()
    close(awp.results)
}

func main() {
    pool := NewAdvancedWorkerPool(3)
    
    // Submit tasks
    tasks := []Task{
        {ID: 1, Payload: "task1", Timeout: 1 * time.Second},
        {ID: 2, Payload: "task2", Timeout: 1 * time.Second},
        {ID: 3, Payload: "task3", Timeout: 1 * time.Second},
    }
    
    for _, task := range tasks {
        pool.Submit(task)
    }
    
    // Collect results
    go func() {
        pool.Close()
    }()
    
    for result := range pool.results {
        fmt.Printf("Task %d: %s (Duration: %v)\n", 
            result.TaskID, result.Message, result.Duration)
    }
}

Performance Comparison

PatternConcurrent WorkersResource UsageThroughputComplexity
Basic Worker PoolFixed numberLowHighLow
Rate Limited PoolVariableMediumControlledMedium
Advanced PoolConfigurableHighOptimizedHigh

The basic worker pool is ideal for simple scenarios where you want to process tasks in parallel. Rate limiting adds control over resource consumption, making it suitable for external API calls or database operations. The advanced pool provides full control with context support, making it production-ready for complex applications.

Best Practices

  1. Use buffered channels to prevent goroutine blocking
  2. Implement proper cleanup with context cancellation
  3. Monitor resource usage to adjust worker count dynamically
  4. Handle errors gracefully in worker functions
  5. Set appropriate timeouts to prevent hanging operations

Learn more with useful resources

  1. Go Concurrency Patterns - Rob Pike
  2. Go Concurrency Patterns: Context
  3. Go by Example: Channels