The worker pattern allows you to manage a pool of goroutines that can handle multiple tasks simultaneously. This is particularly useful when dealing with I/O-bound tasks or when you want to improve the performance of your application by parallelizing work. In this tutorial, we will create a worker pool that reads tasks from a channel and processes them concurrently.

Step 1: Setting Up the Project

Create a new directory for your Go project and initialize a new module:

mkdir go-worker-pool
cd go-worker-pool
go mod init go-worker-pool

Step 2: Defining the Worker Structure

We will define a Worker struct that represents a worker in our pool. Each worker will listen for tasks from a channel and process them.

package main

import (
    "fmt"
    "sync"
    "time"
)

type Task struct {
    ID int
}

type Worker struct {
    ID int
    TaskChannel chan Task
    WaitGroup *sync.WaitGroup
}

func (w *Worker) Start() {
    defer w.WaitGroup.Done()
    for task := range w.TaskChannel {
        fmt.Printf("Worker %d processing task %d\n", w.ID, task.ID)
        time.Sleep(1 * time.Second) // Simulate work
    }
}

Step 3: Creating the Worker Pool

Next, we will create a function to initialize a pool of workers. This function will create a specified number of workers and start them.

func NewWorkerPool(numWorkers int) {
    var wg sync.WaitGroup
    taskChannel := make(chan Task)

    for i := 1; i <= numWorkers; i++ {
        worker := &Worker{
            ID: i,
            TaskChannel: taskChannel,
            WaitGroup: &wg,
        }
        wg.Add(1)
        go worker.Start()
    }

    go func() {
        wg.Wait()
        close(taskChannel)
    }()
}

Step 4: Adding Tasks to the Pool

We need a function to add tasks to the worker pool. This function will send tasks to the taskChannel for workers to process.

func AddTasks(taskChannel chan Task, numTasks int) {
    for i := 1; i <= numTasks; i++ {
        taskChannel <- Task{ID: i}
    }
}

Step 5: Putting It All Together

Now, let’s create the main function to tie everything together. We will create a worker pool, add tasks, and wait for all tasks to be processed.

func main() {
    numWorkers := 3
    numTasks := 10

    taskChannel := make(chan Task)
    NewWorkerPool(numWorkers)

    AddTasks(taskChannel, numTasks)

    // Wait for all tasks to be processed
    time.Sleep(12 * time.Second)
    fmt.Println("All tasks have been processed")
}

Complete Code Example

Here is the complete code for the worker pool:

package main

import (
    "fmt"
    "sync"
    "time"
)

type Task struct {
    ID int
}

type Worker struct {
    ID int
    TaskChannel chan Task
    WaitGroup *sync.WaitGroup
}

func (w *Worker) Start() {
    defer w.WaitGroup.Done()
    for task := range w.TaskChannel {
        fmt.Printf("Worker %d processing task %d\n", w.ID, task.ID)
        time.Sleep(1 * time.Second) // Simulate work
    }
}

func NewWorkerPool(numWorkers int) {
    var wg sync.WaitGroup
    taskChannel := make(chan Task)

    for i := 1; i <= numWorkers; i++ {
        worker := &Worker{
            ID: i,
            TaskChannel: taskChannel,
            WaitGroup: &wg,
        }
        wg.Add(1)
        go worker.Start()
    }

    go func() {
        wg.Wait()
        close(taskChannel)
    }()
}

func AddTasks(taskChannel chan Task, numTasks int) {
    for i := 1; i <= numTasks; i++ {
        taskChannel <- Task{ID: i}
    }
}

func main() {
    numWorkers := 3
    numTasks := 10

    taskChannel := make(chan Task)
    NewWorkerPool(numWorkers)

    AddTasks(taskChannel, numTasks)

    // Wait for all tasks to be processed
    time.Sleep(12 * time.Second)
    fmt.Println("All tasks have been processed")
}

Best Practices

  1. Graceful Shutdown: Ensure that your workers can shut down gracefully by implementing a way to signal them to stop.
  2. Error Handling: Incorporate error handling in your task processing to avoid panics and ensure robustness.
  3. Dynamic Scaling: Consider implementing a mechanism to dynamically adjust the number of workers based on the workload.

Learn more with useful resources