
Go Concurrency Patterns: Goroutines and Channels for Effective Parallel Programming
Core Concepts and Basic Patterns
Goroutines: Lightweight Execution Units
Goroutines are functions that run concurrently with other functions. They're created using the go keyword and are scheduled by the Go runtime. Unlike OS threads, goroutines are multiplexed onto a smaller number of OS threads, making them extremely lightweight.
package main
import (
"fmt"
"time"
)
func sayHello(name string) {
fmt.Printf("Hello, %s!\n", name)
}
func main() {
// Launching multiple goroutines
go sayHello("Alice")
go sayHello("Bob")
go sayHello("Charlie")
// Give goroutines time to execute
time.Sleep(100 * time.Millisecond)
}Channels: Communication Primitives
Channels are typed conduits through which you can send and receive values. They provide a way to synchronize goroutines and pass data between them safely.
package main
import "fmt"
func main() {
// Create an unbuffered channel
ch := make(chan string)
// Launch a goroutine to send data
go func() {
ch <- "Hello from goroutine"
}()
// Receive data from channel
message := <-ch
fmt.Println(message)
}Practical Concurrency Patterns
1. Worker Pool Pattern
The worker pool pattern is essential for handling a large number of tasks efficiently. It limits the number of concurrent operations while processing work items from a queue.
package main
import (
"fmt"
"sync"
"time"
)
func worker(id int, jobs <-chan int, results chan<- int) {
for j := range jobs {
fmt.Printf("Worker %d processing job %d\n", id, j)
time.Sleep(time.Second)
results <- j * 2
}
}
func main() {
const numJobs = 5
jobs := make(chan int, numJobs)
results := make(chan int, numJobs)
// Start 3 workers
var wg sync.WaitGroup
for w := 1; w <= 3; w++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
worker(workerID, jobs, results)
}(w)
}
// Send jobs
for j := 1; j <= numJobs; j++ {
jobs <- j
}
close(jobs)
// Wait for all workers to finish
go func() {
wg.Wait()
close(results)
}()
// Collect results
for r := range results {
fmt.Printf("Result: %d\n", r)
}
}2. Fan-Out/Fan-In Pattern
This pattern distributes work to multiple workers and then collects results back. It's particularly useful for parallel processing of independent tasks.
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
func producer(numbers chan<- int, wg *sync.WaitGroup) {
defer wg.Done()
for i := 0; i < 10; i++ {
numbers <- rand.Intn(100)
time.Sleep(10 * time.Millisecond)
}
}
func processor(id int, input <-chan int, output chan<- int, wg *sync.WaitGroup) {
defer wg.Done()
for num := range input {
// Simulate some processing
time.Sleep(time.Duration(rand.Intn(50)) * time.Millisecond)
output <- num * num
}
}
func main() {
const numWorkers = 3
numbers := make(chan int, 10)
results := make(chan int, 10)
var wg sync.WaitGroup
// Start producer
wg.Add(1)
go producer(numbers, &wg)
// Start workers
for i := 1; i <= numWorkers; i++ {
wg.Add(1)
go processor(i, numbers, results, &wg)
}
// Close input channel when producer finishes
go func() {
wg.Wait()
close(numbers)
}()
// Collect results
go func() {
wg.Wait()
close(results)
}()
// Print results
for result := range results {
fmt.Printf("Processed: %d\n", result)
}
}3. Pipeline Pattern
Pipelines organize goroutines in stages where each stage processes data and passes it to the next stage, creating a data processing flow.
package main
import (
"fmt"
"sync"
)
func gen(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
func sq(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
func main() {
// Create the pipeline
c := gen(2, 3, 4, 5)
out := sq(sq(c))
// Print results
for result := range out {
fmt.Println(result)
}
}Advanced Channel Techniques
Buffered vs Unbuffered Channels
Understanding the difference between buffered and unbuffered channels is crucial for effective concurrency:
| Channel Type | Buffer Size | Blocking Behavior | Use Case |
|---|---|---|---|
| Unbuffered | 0 | Both send/receive block | Synchronization |
| Buffered | > 0 | Send blocks only when full | Flow control |
| Nil | N/A | Both operations block | Conditional execution |
package main
import "fmt"
func main() {
// Unbuffered channel - synchronous
unbuffered := make(chan int)
go func() {
unbuffered <- 42
}()
fmt.Println(<-unbuffered) // Blocks until send
// Buffered channel - asynchronous up to buffer size
buffered := make(chan int, 2)
buffered <- 1
buffered <- 2
fmt.Println(<-buffered) // Non-blocking
fmt.Println(<-buffered) // Non-blocking
}Select Statement for Multiplexing
The select statement allows you to wait on multiple channel operations, making it ideal for implementing timeouts and concurrent operations.
package main
import (
"fmt"
"time"
)
func main() {
ch1 := make(chan string)
ch2 := make(chan string)
go func() {
time.Sleep(1 * time.Second)
ch1 <- "First message"
}()
go func() {
time.Sleep(2 * time.Second)
ch2 <- "Second message"
}()
// Wait for first message
select {
case msg1 := <-ch1:
fmt.Println("Received:", msg1)
case msg2 := <-ch2:
fmt.Println("Received:", msg2)
case <-time.After(3 * time.Second):
fmt.Println("Timeout occurred")
}
}Best Practices and Common Pitfalls
Resource Management
Always ensure proper cleanup of goroutines and channels to prevent resource leaks:
package main
import (
"context"
"fmt"
"time"
)
func worker(ctx context.Context, id int, ch chan int) {
for {
select {
case <-ctx.Done():
fmt.Printf("Worker %d shutting down\n", id)
return
case value := <-ch:
fmt.Printf("Worker %d processing %d\n", id, value)
time.Sleep(100 * time.Millisecond)
}
}
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ch := make(chan int)
// Start workers
for i := 1; i <= 3; i++ {
go worker(ctx, i, ch)
}
// Send work
for i := 1; i <= 10; i++ {
ch <- i
}
time.Sleep(2 * time.Second)
cancel() // Signal workers to stop
}Error Handling in Concurrent Code
Proper error handling in concurrent scenarios requires careful consideration of channel closing and error propagation:
package main
import (
"fmt"
"sync"
)
func processWithErrorHandling(input <-chan int, errors chan<- error, wg *sync.WaitGroup) {
defer wg.Done()
for num := range input {
if num < 0 {
errors <- fmt.Errorf("negative number: %d", num)
return
}
fmt.Printf("Processing: %d\n", num)
}
}
func main() {
input := make(chan int)
errors := make(chan error)
var wg sync.WaitGroup
wg.Add(1)
go processWithErrorHandling(input, errors, &wg)
// Send data
go func() {
defer close(input)
input <- 1
input <- 2
input <- -1 // This will cause an error
}()
// Handle errors
go func() {
wg.Wait()
close(errors)
}()
for err := range errors {
if err != nil {
fmt.Printf("Error: %v\n", err)
break
}
}
}