Core Concepts and Basic Patterns

Goroutines: Lightweight Execution Units

Goroutines are functions that run concurrently with other functions. They're created using the go keyword and are scheduled by the Go runtime. Unlike OS threads, goroutines are multiplexed onto a smaller number of OS threads, making them extremely lightweight.

package main

import (
    "fmt"
    "time"
)

func sayHello(name string) {
    fmt.Printf("Hello, %s!\n", name)
}

func main() {
    // Launching multiple goroutines
    go sayHello("Alice")
    go sayHello("Bob")
    go sayHello("Charlie")
    
    // Give goroutines time to execute
    time.Sleep(100 * time.Millisecond)
}

Channels: Communication Primitives

Channels are typed conduits through which you can send and receive values. They provide a way to synchronize goroutines and pass data between them safely.

package main

import "fmt"

func main() {
    // Create an unbuffered channel
    ch := make(chan string)
    
    // Launch a goroutine to send data
    go func() {
        ch <- "Hello from goroutine"
    }()
    
    // Receive data from channel
    message := <-ch
    fmt.Println(message)
}

Practical Concurrency Patterns

1. Worker Pool Pattern

The worker pool pattern is essential for handling a large number of tasks efficiently. It limits the number of concurrent operations while processing work items from a queue.

package main

import (
    "fmt"
    "sync"
    "time"
)

func worker(id int, jobs <-chan int, results chan<- int) {
    for j := range jobs {
        fmt.Printf("Worker %d processing job %d\n", id, j)
        time.Sleep(time.Second)
        results <- j * 2
    }
}

func main() {
    const numJobs = 5
    jobs := make(chan int, numJobs)
    results := make(chan int, numJobs)
    
    // Start 3 workers
    var wg sync.WaitGroup
    for w := 1; w <= 3; w++ {
        wg.Add(1)
        go func(workerID int) {
            defer wg.Done()
            worker(workerID, jobs, results)
        }(w)
    }
    
    // Send jobs
    for j := 1; j <= numJobs; j++ {
        jobs <- j
    }
    close(jobs)
    
    // Wait for all workers to finish
    go func() {
        wg.Wait()
        close(results)
    }()
    
    // Collect results
    for r := range results {
        fmt.Printf("Result: %d\n", r)
    }
}

2. Fan-Out/Fan-In Pattern

This pattern distributes work to multiple workers and then collects results back. It's particularly useful for parallel processing of independent tasks.

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

func producer(numbers chan<- int, wg *sync.WaitGroup) {
    defer wg.Done()
    
    for i := 0; i < 10; i++ {
        numbers <- rand.Intn(100)
        time.Sleep(10 * time.Millisecond)
    }
}

func processor(id int, input <-chan int, output chan<- int, wg *sync.WaitGroup) {
    defer wg.Done()
    
    for num := range input {
        // Simulate some processing
        time.Sleep(time.Duration(rand.Intn(50)) * time.Millisecond)
        output <- num * num
    }
}

func main() {
    const numWorkers = 3
    numbers := make(chan int, 10)
    results := make(chan int, 10)
    
    var wg sync.WaitGroup
    
    // Start producer
    wg.Add(1)
    go producer(numbers, &wg)
    
    // Start workers
    for i := 1; i <= numWorkers; i++ {
        wg.Add(1)
        go processor(i, numbers, results, &wg)
    }
    
    // Close input channel when producer finishes
    go func() {
        wg.Wait()
        close(numbers)
    }()
    
    // Collect results
    go func() {
        wg.Wait()
        close(results)
    }()
    
    // Print results
    for result := range results {
        fmt.Printf("Processed: %d\n", result)
    }
}

3. Pipeline Pattern

Pipelines organize goroutines in stages where each stage processes data and passes it to the next stage, creating a data processing flow.

package main

import (
    "fmt"
    "sync"
)

func gen(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        for _, n := range nums {
            out <- n
        }
        close(out)
    }()
    return out
}

func sq(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n * n
        }
        close(out)
    }()
    return out
}

func main() {
    // Create the pipeline
    c := gen(2, 3, 4, 5)
    out := sq(sq(c))
    
    // Print results
    for result := range out {
        fmt.Println(result)
    }
}

Advanced Channel Techniques

Buffered vs Unbuffered Channels

Understanding the difference between buffered and unbuffered channels is crucial for effective concurrency:

Channel TypeBuffer SizeBlocking BehaviorUse Case
Unbuffered0Both send/receive blockSynchronization
Buffered> 0Send blocks only when fullFlow control
NilN/ABoth operations blockConditional execution
package main

import "fmt"

func main() {
    // Unbuffered channel - synchronous
    unbuffered := make(chan int)
    go func() {
        unbuffered <- 42
    }()
    fmt.Println(<-unbuffered) // Blocks until send
    
    // Buffered channel - asynchronous up to buffer size
    buffered := make(chan int, 2)
    buffered <- 1
    buffered <- 2
    fmt.Println(<-buffered) // Non-blocking
    fmt.Println(<-buffered) // Non-blocking
}

Select Statement for Multiplexing

The select statement allows you to wait on multiple channel operations, making it ideal for implementing timeouts and concurrent operations.

package main

import (
    "fmt"
    "time"
)

func main() {
    ch1 := make(chan string)
    ch2 := make(chan string)
    
    go func() {
        time.Sleep(1 * time.Second)
        ch1 <- "First message"
    }()
    
    go func() {
        time.Sleep(2 * time.Second)
        ch2 <- "Second message"
    }()
    
    // Wait for first message
    select {
    case msg1 := <-ch1:
        fmt.Println("Received:", msg1)
    case msg2 := <-ch2:
        fmt.Println("Received:", msg2)
    case <-time.After(3 * time.Second):
        fmt.Println("Timeout occurred")
    }
}

Best Practices and Common Pitfalls

Resource Management

Always ensure proper cleanup of goroutines and channels to prevent resource leaks:

package main

import (
    "context"
    "fmt"
    "time"
)

func worker(ctx context.Context, id int, ch chan int) {
    for {
        select {
        case <-ctx.Done():
            fmt.Printf("Worker %d shutting down\n", id)
            return
        case value := <-ch:
            fmt.Printf("Worker %d processing %d\n", id, value)
            time.Sleep(100 * time.Millisecond)
        }
    }
}

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()
    
    ch := make(chan int)
    
    // Start workers
    for i := 1; i <= 3; i++ {
        go worker(ctx, i, ch)
    }
    
    // Send work
    for i := 1; i <= 10; i++ {
        ch <- i
    }
    
    time.Sleep(2 * time.Second)
    cancel() // Signal workers to stop
}

Error Handling in Concurrent Code

Proper error handling in concurrent scenarios requires careful consideration of channel closing and error propagation:

package main

import (
    "fmt"
    "sync"
)

func processWithErrorHandling(input <-chan int, errors chan<- error, wg *sync.WaitGroup) {
    defer wg.Done()
    
    for num := range input {
        if num < 0 {
            errors <- fmt.Errorf("negative number: %d", num)
            return
        }
        fmt.Printf("Processing: %d\n", num)
    }
}

func main() {
    input := make(chan int)
    errors := make(chan error)
    
    var wg sync.WaitGroup
    wg.Add(1)
    go processWithErrorHandling(input, errors, &wg)
    
    // Send data
    go func() {
        defer close(input)
        input <- 1
        input <- 2
        input <- -1 // This will cause an error
    }()
    
    // Handle errors
    go func() {
        wg.Wait()
        close(errors)
    }()
    
    for err := range errors {
        if err != nil {
            fmt.Printf("Error: %v\n", err)
            break
        }
    }
}

Learn more with useful resources