Fan-Out Fan-In Pattern

Part of Golang Mastery course

~15 min read
Interactive
Hands-on
Beginner-friendly

Fan-Out Fan-In Pattern

The Fan-Out Fan-In pattern is a powerful concurrency pattern in Go that allows you to distribute work among multiple goroutines and then collect the results. This pattern is particularly useful for CPU-intensive tasks or workloads where parallelization can improve performance.

Understanding Fan-Out Fan-In#

The pattern has two key components:

  1. Fan-Out: Distributing work across multiple goroutines to process data in parallel
  2. Fan-In: Combining the results from those goroutines into a single channel

This pattern builds on the pipeline pattern we explored earlier, but adds parallelism to one or more stages of the pipeline.

Visual Representation#

Here's a visual representation of the Fan-Out Fan-In pattern:

┌─── Worker 1 ───┐ │ │ Input ───────► ├─── Worker 2 ───┼───► Output │ │ └─── Worker 3 ───┘

What is Fan-Out?#

Fan-Out occurs when multiple goroutines read from the same channel, distributing the work among them. This is useful when:

  • Processing each item is CPU-intensive
  • Items can be processed independently of each other
  • You want to utilize multiple CPU cores

What is Fan-In?#

Fan-In is the process of combining multiple results into a single channel. This is done by:

  • Creating an output channel
  • Starting a goroutine for each input channel
  • Having each goroutine forward values from its input channel to the output channel
  • Closing the output channel when all input channels are done

Simple Example#

Let's look at a simple implementation of the Fan-Out Fan-In pattern:

example.go
package main
 
import (
string">"fmt"
string">"sync"
string">"time"
)
 
string">"comment">// generator function from earlier examples
func generator(nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
out <- n
}
}()
return out
}
 
string">"comment">// square squares a number but runs slowly
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
string">"comment">// Simulate a CPU-intensive operation
time.Sleep(100 * time.Millisecond)
out <- n * n
}
}()
return out
}
 
string">"comment">// merge combines multiple channels into one
func merge(cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
string">"comment">// Start an output goroutine for each input channel
output := func(c <-chan int) {
defer wg.Done()
for n := range c {
out <- n
}
}
wg.Add(len(cs))
for _, c := range cs {
go output(c)
}
string">"comment">// Start a goroutine to close out once all the output goroutines are done
go func() {
wg.Wait()
close(out)
}()
return out
}
 
func main() {
start := time.Now()
string">"comment">// Create a channel of inputs
in := generator(2, 3, 4, 5, 6, 7, 8, 9)
string">"comment">// Fan-out to 4 square operations
c1 := square(in)
c2 := square(in)
c3 := square(in)
c4 := square(in)
string">"comment">// Fan-in the results
out := merge(c1, c2, c3, c4)
string">"comment">// Consume the output
var results []int
for n := range out {
results = append(results, n)
}
fmt.Println(results)
fmt.Printf(string">"Took: %v\n", time.Since(start))
string">"comment">// Compare with sequential version
start = time.Now()
in = generator(2, 3, 4, 5, 6, 7, 8, 9)
sequential := square(in)
var seqResults []int
for n := range sequential {
seqResults = append(seqResults, n)
}
fmt.Println(seqResults)
fmt.Printf(string">"Sequential took: %v\n", time.Since(start))
}
 

When you run this program, you'll notice the parallel version is significantly faster because it processes multiple items simultaneously.

Dynamic Fan-Out#

Rather than hard-coding the number of workers, we can dynamically create them:

example.go
string">"comment">// fanOut runs multiple instances of fn in parallel
func fanOut(in <-chan int, n int, fn func(<-chan int) <-chan int) []<-chan int {
cs := make([]<-chan int, n)
for i := 0; i < n; i++ {
cs[i] = fn(in)
}
return cs
}
 
func main() {
in := generator(2, 3, 4, 5, 6, 7, 8, 9)
string">"comment">// Create 4 workers dynamically
channels := fanOut(in, 4, square)
string">"comment">// Fan-in the results
out := merge(channels...)
string">"comment">// Consume the results...
}
 

Limiting the Number of Workers#

The number of workers should generally match the number of available CPU cores. You can determine this at runtime:

example.go
import string">"runtime"
 
func main() {
numCPU := runtime.NumCPU()
fmt.Printf(string">"Using %d CPUs\n", numCPU)
in := generator(2, 3, 4, 5, 6, 7, 8, 9)
string">"comment">// Create workers equal to the number of CPUs
channels := fanOut(in, numCPU, square)
string">"comment">// Fan-in the results
out := merge(channels...)
string">"comment">// ...
}
 

Bounded Fan-Out#

Sometimes you want to limit the number of concurrent operations, even if you have more CPU cores available. A bounded fan-out pattern controls the maximum number of concurrent operations:

example.go
string">"comment">// boundedSquare limits the number of concurrent operations
func boundedSquare(in <-chan int, limit int) <-chan int {
out := make(chan int)
string">"comment">// Create a semaphore channel with the limit
sem := make(chan struct{}, limit)
string">"comment">// Use a WaitGroup to wait for all goroutines to finish
var wg sync.WaitGroup
string">"comment">// Process function
process := func(n int) {
defer func() {
<-sem string">"comment">// Release the semaphore
wg.Done()
}()
string">"comment">// Simulate CPU-intensive work
time.Sleep(100 * time.Millisecond)
out <- n * n
}
go func() {
string">"comment">// For each input, acquire the semaphore before starting a goroutine
for n := range in {
wg.Add(1)
sem <- struct{}{} string">"comment">// Acquire semaphore
go process(n)
}
string">"comment">// Wait for all goroutines to finish and close the output channel
wg.Wait()
close(out)
}()
return out
}
 
func main() {
in := generator(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
string">"comment">// Process with a maximum of 3 concurrent operations
out := boundedSquare(in, 3)
string">"comment">// ...
}
 

Ordered Results#

One challenge with the Fan-Out Fan-In pattern is that results may come back in a different order than the inputs. If order matters, we need to track it:

example.go
package main
 
import (
string">"fmt"
string">"sort"
string">"sync"
string">"time"
)
 
type Item struct {
ID int
Value int
}
 
string">"comment">// orderedSquare preserves order by tagging items with IDs
func orderedSquare(in <-chan int) <-chan Item {
out := make(chan Item)
go func() {
defer close(out)
id := 0
for n := range in {
string">"comment">// Tag each item with a sequential ID
currentID := id
id++
string">"comment">// Simulate varying processing times
delay := 100 * time.Millisecond
if n%2 == 0 {
delay = 200 * time.Millisecond
}
string">"comment">// Process in parallel while preserving IDs
go func(id, n int) {
time.Sleep(delay)
out <- Item{ID: id, Value: n * n}
}(currentID, n)
}
}()
return out
}
 
string">"comment">// collectOrdered collects and sorts results
func collectOrdered(in <-chan Item) []int {
string">"comment">// Collect all items
var items []Item
for item := range in {
items = append(items, item)
}
string">"comment">// Sort by ID
sort.Slice(items, func(i, j int) bool {
return items[i].ID < items[j].ID
})
string">"comment">// Extract the values in order
result := make([]int, len(items))
for i, item := range items {
result[i] = item.Value
}
return result
}
 
func main() {
in := generator(2, 3, 4, 5, 6, 7, 8, 9)
string">"comment">// Process with order preservation
out := orderedSquare(in)
string">"comment">// Collect and sort results
results := collectOrdered(out)
fmt.Println(results)
}
 

Error Handling#

Error handling in Fan-Out Fan-In requires passing errors along with results:

example.go
package main
 
import (
string">"errors"
string">"fmt"
string">"math/rand"
string">"sync"
string">"time"
)
 
type Result struct {
Value int
Err error
}
 
string">"comment">// generator creates numbers
func generator(nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
out <- n
}
}()
return out
}
 
string">"comment">// process simulates processing that might fail
func process(in <-chan int) <-chan Result {
out := make(chan Result)
go func() {
defer close(out)
for n := range in {
string">"comment">// Simulate occasional failure
if rand.Intn(10) < 2 { string">"comment">// 20% chance of failure
out <- Result{Err: errors.New(fmt.Sprintf(string">"failed to process %d", n))}
continue
}
string">"comment">// Simulate processing time
time.Sleep(100 * time.Millisecond)
out <- Result{Value: n * n}
}
}()
return out
}
 
string">"comment">// fanOut starts multiple processors
func fanOut(in <-chan int, n int) []<-chan Result {
cs := make([]<-chan Result, n)
for i := 0; i < n; i++ {
cs[i] = process(in)
}
return cs
}
 
string">"comment">// merge combines multiple result channels
func merge(cs ...<-chan Result) <-chan Result {
var wg sync.WaitGroup
out := make(chan Result)
output := func(c <-chan Result) {
defer wg.Done()
for r := range c {
out <- r
}
}
wg.Add(len(cs))
for _, c := range cs {
go output(c)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
 
func main() {
rand.Seed(time.Now().UnixNano())
string">"comment">// Create input channel
in := generator(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
string">"comment">// Fan out to multiple processors
channels := fanOut(in, 4)
string">"comment">// Merge results
results := merge(channels...)
string">"comment">// Process results, handling errors
successful := 0
failed := 0
for r := range results {
if r.Err != nil {
fmt.Printf(string">"Error: %v\n", r.Err)
failed++
} else {
fmt.Printf(string">"Result: %d\n", r.Value)
successful++
}
}
fmt.Printf(string">"Processed: %d successful, %d failed\n", successful, failed)
}
 

Real-World Example: Image Processing#

A common use case for Fan-Out Fan-In is processing a collection of images:

example.go
package main
 
import (
string">"fmt"
string">"image"
string">"image/color"
string">"image/jpeg"
string">"os"
string">"path/filepath"
string">"sync"
string">"time"
)
 
string">"comment">// ImageTask represents an image processing task
type ImageTask struct {
InputPath string
OutputPath string
}
 
string">"comment">// ImageResult represents the result of processing
type ImageResult struct {
Task ImageTask
Error error
}
 
string">"comment">// generateTasks creates image processing tasks
func generateTasks(inputDir, outputDir string, files []string) <-chan ImageTask {
tasks := make(chan ImageTask)
go func() {
defer close(tasks)
for _, file := range files {
if filepath.Ext(file) != string">".jpg" && filepath.Ext(file) != string">".jpeg" {
continue
}
inputPath := filepath.Join(inputDir, file)
outputPath := filepath.Join(outputDir, string">"bw_"+file)
tasks <- ImageTask{
InputPath: inputPath,
OutputPath: outputPath,
}
}
}()
return tasks
}
 
string">"comment">// processImage converts an image to black and white
func processImage(tasks <-chan ImageTask) <-chan ImageResult {
results := make(chan ImageResult)
go func() {
defer close(results)
for task := range tasks {
string">"comment">// Open the input file
inputFile, err := os.Open(task.InputPath)
if err != nil {
results <- ImageResult{Task: task, Error: err}
continue
}
string">"comment">// Decode the image
img, err := jpeg.Decode(inputFile)
inputFile.Close()
if err != nil {
results <- ImageResult{Task: task, Error: err}
continue
}
string">"comment">// Convert to grayscale
bounds := img.Bounds()
grayImg := image.NewGray(bounds)
for y := bounds.Min.Y; y < bounds.Max.Y; y++ {
for x := bounds.Min.X; x < bounds.Max.X; x++ {
c := color.GrayModel.Convert(img.At(x, y))
grayImg.Set(x, y, c)
}
}
string">"comment">// Create the output file
outputFile, err := os.Create(task.OutputPath)
if err != nil {
results <- ImageResult{Task: task, Error: err}
continue
}
string">"comment">// Encode the grayscale image
err = jpeg.Encode(outputFile, grayImg, nil)
outputFile.Close()
if err != nil {
results <- ImageResult{Task: task, Error: err}
continue
}
results <- ImageResult{Task: task, Error: nil}
}
}()
return results
}
 
string">"comment">// merge combines multiple result channels
func merge(cs ...<-chan ImageResult) <-chan ImageResult {
var wg sync.WaitGroup
out := make(chan ImageResult)
output := func(c <-chan ImageResult) {
defer wg.Done()
for r := range c {
out <- r
}
}
wg.Add(len(cs))
for _, c := range cs {
go output(c)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
 
func main() {
inputDir := string">"./images"
outputDir := string">"./images/bw"
string">"comment">// Create output directory if it doesn't exist
os.MkdirAll(outputDir, 0755)
string">"comment">// Get all files from the input directory
files, err := filepath.Glob(filepath.Join(inputDir, string">"*"))
if err != nil {
fmt.Printf(string">"Error reading directory: %v\n", err)
return
}
string">"comment">// Get just the filenames
var filenames []string
for _, file := range files {
filenames = append(filenames, filepath.Base(file))
}
string">"comment">// Generate tasks
tasks := generateTasks(inputDir, outputDir, filenames)
string">"comment">// Fan out processing to multiple workers
numWorkers := 4
processors := make([]<-chan ImageResult, numWorkers)
for i := 0; i < numWorkers; i++ {
processors[i] = processImage(tasks)
}
string">"comment">// Merge results
results := merge(processors...)
string">"comment">// Process results
start := time.Now()
successful := 0
failed := 0
for result := range results {
if result.Error != nil {
fmt.Printf(string">"Error processing %s: %v\n", result.Task.InputPath, result.Error)
failed++
} else {
fmt.Printf(string">"Successfully processed %s -> %s\n",
result.Task.InputPath, result.Task.OutputPath)
successful++
}
}
elapsed := time.Since(start)
fmt.Printf(string">"Processed %d images (%d successful, %d failed) in %v\n",
successful+failed, successful, failed, elapsed)
}
 

Best Practices#

  1. Match workers to CPUs: For CPU-bound tasks, create workers equal to the number of CPU cores
  2. Consider I/O: For I/O-bound tasks, you might want more workers than CPU cores
  3. Watch memory usage: Each worker consumes resources, so balance parallelism with memory usage
  4. Handle errors properly: Pass errors through the pipeline rather than handling them in workers
  5. Prefer bounded concurrency: Use a semaphore to limit the maximum number of concurrent operations
  6. Use context for cancellation: Implement cancellation using context for long-running operations

Summary#

  • The Fan-Out Fan-In pattern distributes work to multiple goroutines and collects the results
  • This is ideal for CPU-intensive operations or operations that can be parallelized
  • Fan-Out distributes work to multiple workers that read from the same channel
  • Fan-In combines results from multiple workers into a single channel
  • You can preserve order by tagging items with IDs
  • Error handling requires passing errors along with results
  • This pattern can significantly improve performance for suitable workloads

In the next lab, we'll learn about cancelling goroutines using the context package, which is important for managing the lifecycle of concurrent operations.

Your Progress

18 of 19 modules
95%
Started95% Complete
Previous
SpaceComplete
Next