Fan-Out Fan-In Pattern
The Fan-Out Fan-In pattern is a powerful concurrency pattern in Go that allows you to distribute work among multiple goroutines and then collect the results. This pattern is particularly useful for CPU-intensive tasks or workloads where parallelization can improve performance.
Understanding Fan-Out Fan-In#
The pattern has two key components:
- Fan-Out: Distributing work across multiple goroutines to process data in parallel
- Fan-In: Combining the results from those goroutines into a single channel
This pattern builds on the pipeline pattern we explored earlier, but adds parallelism to one or more stages of the pipeline.
Visual Representation#
Here's a visual representation of the Fan-Out Fan-In pattern:
┌─── Worker 1 ───┐
│ │
Input ───────► ├─── Worker 2 ───┼───► Output
│ │
└─── Worker 3 ───┘
What is Fan-Out?#
Fan-Out occurs when multiple goroutines read from the same channel, distributing the work among them. This is useful when:
- Processing each item is CPU-intensive
- Items can be processed independently of each other
- You want to utilize multiple CPU cores
What is Fan-In?#
Fan-In is the process of combining multiple results into a single channel. This is done by:
- Creating an output channel
- Starting a goroutine for each input channel
- Having each goroutine forward values from its input channel to the output channel
- Closing the output channel when all input channels are done
Simple Example#
Let's look at a simple implementation of the Fan-Out Fan-In pattern:
package mainimport (string">"fmt"string">"sync"string">"time")string">"comment">// generator function from earlier examplesfunc generator(nums ...int) <-chan int {out := make(chan int)go func() {defer close(out)for _, n := range nums {out <- n}}()return out}string">"comment">// square squares a number but runs slowlyfunc square(in <-chan int) <-chan int {out := make(chan int)go func() {defer close(out)for n := range in {string">"comment">// Simulate a CPU-intensive operationtime.Sleep(100 * time.Millisecond)out <- n * n}}()return out}string">"comment">// merge combines multiple channels into onefunc merge(cs ...<-chan int) <-chan int {var wg sync.WaitGroupout := make(chan int)string">"comment">// Start an output goroutine for each input channeloutput := func(c <-chan int) {defer wg.Done()for n := range c {out <- n}}wg.Add(len(cs))for _, c := range cs {go output(c)}string">"comment">// Start a goroutine to close out once all the output goroutines are donego func() {wg.Wait()close(out)}()return out}func main() {start := time.Now()string">"comment">// Create a channel of inputsin := generator(2, 3, 4, 5, 6, 7, 8, 9)string">"comment">// Fan-out to 4 square operationsc1 := square(in)c2 := square(in)c3 := square(in)c4 := square(in)string">"comment">// Fan-in the resultsout := merge(c1, c2, c3, c4)string">"comment">// Consume the outputvar results []intfor n := range out {results = append(results, n)}fmt.Println(results)fmt.Printf(string">"Took: %v\n", time.Since(start))string">"comment">// Compare with sequential versionstart = time.Now()in = generator(2, 3, 4, 5, 6, 7, 8, 9)sequential := square(in)var seqResults []intfor n := range sequential {seqResults = append(seqResults, n)}fmt.Println(seqResults)fmt.Printf(string">"Sequential took: %v\n", time.Since(start))}
When you run this program, you'll notice the parallel version is significantly faster because it processes multiple items simultaneously.
Dynamic Fan-Out#
Rather than hard-coding the number of workers, we can dynamically create them:
string">"comment">// fanOut runs multiple instances of fn in parallelfunc fanOut(in <-chan int, n int, fn func(<-chan int) <-chan int) []<-chan int {cs := make([]<-chan int, n)for i := 0; i < n; i++ {cs[i] = fn(in)}return cs}func main() {in := generator(2, 3, 4, 5, 6, 7, 8, 9)string">"comment">// Create 4 workers dynamicallychannels := fanOut(in, 4, square)string">"comment">// Fan-in the resultsout := merge(channels...)string">"comment">// Consume the results...}
Limiting the Number of Workers#
The number of workers should generally match the number of available CPU cores. You can determine this at runtime:
import string">"runtime"func main() {numCPU := runtime.NumCPU()fmt.Printf(string">"Using %d CPUs\n", numCPU)in := generator(2, 3, 4, 5, 6, 7, 8, 9)string">"comment">// Create workers equal to the number of CPUschannels := fanOut(in, numCPU, square)string">"comment">// Fan-in the resultsout := merge(channels...)string">"comment">// ...}
Bounded Fan-Out#
Sometimes you want to limit the number of concurrent operations, even if you have more CPU cores available. A bounded fan-out pattern controls the maximum number of concurrent operations:
string">"comment">// boundedSquare limits the number of concurrent operationsfunc boundedSquare(in <-chan int, limit int) <-chan int {out := make(chan int)string">"comment">// Create a semaphore channel with the limitsem := make(chan struct{}, limit)string">"comment">// Use a WaitGroup to wait for all goroutines to finishvar wg sync.WaitGroupstring">"comment">// Process functionprocess := func(n int) {defer func() {<-sem string">"comment">// Release the semaphorewg.Done()}()string">"comment">// Simulate CPU-intensive worktime.Sleep(100 * time.Millisecond)out <- n * n}go func() {string">"comment">// For each input, acquire the semaphore before starting a goroutinefor n := range in {wg.Add(1)sem <- struct{}{} string">"comment">// Acquire semaphorego process(n)}string">"comment">// Wait for all goroutines to finish and close the output channelwg.Wait()close(out)}()return out}func main() {in := generator(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)string">"comment">// Process with a maximum of 3 concurrent operationsout := boundedSquare(in, 3)string">"comment">// ...}
Ordered Results#
One challenge with the Fan-Out Fan-In pattern is that results may come back in a different order than the inputs. If order matters, we need to track it:
package mainimport (string">"fmt"string">"sort"string">"sync"string">"time")type Item struct {ID intValue int}string">"comment">// orderedSquare preserves order by tagging items with IDsfunc orderedSquare(in <-chan int) <-chan Item {out := make(chan Item)go func() {defer close(out)id := 0for n := range in {string">"comment">// Tag each item with a sequential IDcurrentID := idid++string">"comment">// Simulate varying processing timesdelay := 100 * time.Millisecondif n%2 == 0 {delay = 200 * time.Millisecond}string">"comment">// Process in parallel while preserving IDsgo func(id, n int) {time.Sleep(delay)out <- Item{ID: id, Value: n * n}}(currentID, n)}}()return out}string">"comment">// collectOrdered collects and sorts resultsfunc collectOrdered(in <-chan Item) []int {string">"comment">// Collect all itemsvar items []Itemfor item := range in {items = append(items, item)}string">"comment">// Sort by IDsort.Slice(items, func(i, j int) bool {return items[i].ID < items[j].ID})string">"comment">// Extract the values in orderresult := make([]int, len(items))for i, item := range items {result[i] = item.Value}return result}func main() {in := generator(2, 3, 4, 5, 6, 7, 8, 9)string">"comment">// Process with order preservationout := orderedSquare(in)string">"comment">// Collect and sort resultsresults := collectOrdered(out)fmt.Println(results)}
Error Handling#
Error handling in Fan-Out Fan-In requires passing errors along with results:
package mainimport (string">"errors"string">"fmt"string">"math/rand"string">"sync"string">"time")type Result struct {Value intErr error}string">"comment">// generator creates numbersfunc generator(nums ...int) <-chan int {out := make(chan int)go func() {defer close(out)for _, n := range nums {out <- n}}()return out}string">"comment">// process simulates processing that might failfunc process(in <-chan int) <-chan Result {out := make(chan Result)go func() {defer close(out)for n := range in {string">"comment">// Simulate occasional failureif rand.Intn(10) < 2 { string">"comment">// 20% chance of failureout <- Result{Err: errors.New(fmt.Sprintf(string">"failed to process %d", n))}continue}string">"comment">// Simulate processing timetime.Sleep(100 * time.Millisecond)out <- Result{Value: n * n}}}()return out}string">"comment">// fanOut starts multiple processorsfunc fanOut(in <-chan int, n int) []<-chan Result {cs := make([]<-chan Result, n)for i := 0; i < n; i++ {cs[i] = process(in)}return cs}string">"comment">// merge combines multiple result channelsfunc merge(cs ...<-chan Result) <-chan Result {var wg sync.WaitGroupout := make(chan Result)output := func(c <-chan Result) {defer wg.Done()for r := range c {out <- r}}wg.Add(len(cs))for _, c := range cs {go output(c)}go func() {wg.Wait()close(out)}()return out}func main() {rand.Seed(time.Now().UnixNano())string">"comment">// Create input channelin := generator(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)string">"comment">// Fan out to multiple processorschannels := fanOut(in, 4)string">"comment">// Merge resultsresults := merge(channels...)string">"comment">// Process results, handling errorssuccessful := 0failed := 0for r := range results {if r.Err != nil {fmt.Printf(string">"Error: %v\n", r.Err)failed++} else {fmt.Printf(string">"Result: %d\n", r.Value)successful++}}fmt.Printf(string">"Processed: %d successful, %d failed\n", successful, failed)}
Real-World Example: Image Processing#
A common use case for Fan-Out Fan-In is processing a collection of images:
package mainimport (string">"fmt"string">"image"string">"image/color"string">"image/jpeg"string">"os"string">"path/filepath"string">"sync"string">"time")string">"comment">// ImageTask represents an image processing tasktype ImageTask struct {InputPath stringOutputPath string}string">"comment">// ImageResult represents the result of processingtype ImageResult struct {Task ImageTaskError error}string">"comment">// generateTasks creates image processing tasksfunc generateTasks(inputDir, outputDir string, files []string) <-chan ImageTask {tasks := make(chan ImageTask)go func() {defer close(tasks)for _, file := range files {if filepath.Ext(file) != string">".jpg" && filepath.Ext(file) != string">".jpeg" {continue}inputPath := filepath.Join(inputDir, file)outputPath := filepath.Join(outputDir, string">"bw_"+file)tasks <- ImageTask{InputPath: inputPath,OutputPath: outputPath,}}}()return tasks}string">"comment">// processImage converts an image to black and whitefunc processImage(tasks <-chan ImageTask) <-chan ImageResult {results := make(chan ImageResult)go func() {defer close(results)for task := range tasks {string">"comment">// Open the input fileinputFile, err := os.Open(task.InputPath)if err != nil {results <- ImageResult{Task: task, Error: err}continue}string">"comment">// Decode the imageimg, err := jpeg.Decode(inputFile)inputFile.Close()if err != nil {results <- ImageResult{Task: task, Error: err}continue}string">"comment">// Convert to grayscalebounds := img.Bounds()grayImg := image.NewGray(bounds)for y := bounds.Min.Y; y < bounds.Max.Y; y++ {for x := bounds.Min.X; x < bounds.Max.X; x++ {c := color.GrayModel.Convert(img.At(x, y))grayImg.Set(x, y, c)}}string">"comment">// Create the output fileoutputFile, err := os.Create(task.OutputPath)if err != nil {results <- ImageResult{Task: task, Error: err}continue}string">"comment">// Encode the grayscale imageerr = jpeg.Encode(outputFile, grayImg, nil)outputFile.Close()if err != nil {results <- ImageResult{Task: task, Error: err}continue}results <- ImageResult{Task: task, Error: nil}}}()return results}string">"comment">// merge combines multiple result channelsfunc merge(cs ...<-chan ImageResult) <-chan ImageResult {var wg sync.WaitGroupout := make(chan ImageResult)output := func(c <-chan ImageResult) {defer wg.Done()for r := range c {out <- r}}wg.Add(len(cs))for _, c := range cs {go output(c)}go func() {wg.Wait()close(out)}()return out}func main() {inputDir := string">"./images"outputDir := string">"./images/bw"string">"comment">// Create output directory if it doesn't existos.MkdirAll(outputDir, 0755)string">"comment">// Get all files from the input directoryfiles, err := filepath.Glob(filepath.Join(inputDir, string">"*"))if err != nil {fmt.Printf(string">"Error reading directory: %v\n", err)return}string">"comment">// Get just the filenamesvar filenames []stringfor _, file := range files {filenames = append(filenames, filepath.Base(file))}string">"comment">// Generate taskstasks := generateTasks(inputDir, outputDir, filenames)string">"comment">// Fan out processing to multiple workersnumWorkers := 4processors := make([]<-chan ImageResult, numWorkers)for i := 0; i < numWorkers; i++ {processors[i] = processImage(tasks)}string">"comment">// Merge resultsresults := merge(processors...)string">"comment">// Process resultsstart := time.Now()successful := 0failed := 0for result := range results {if result.Error != nil {fmt.Printf(string">"Error processing %s: %v\n", result.Task.InputPath, result.Error)failed++} else {fmt.Printf(string">"Successfully processed %s -> %s\n",result.Task.InputPath, result.Task.OutputPath)successful++}}elapsed := time.Since(start)fmt.Printf(string">"Processed %d images (%d successful, %d failed) in %v\n",successful+failed, successful, failed, elapsed)}
Best Practices#
- Match workers to CPUs: For CPU-bound tasks, create workers equal to the number of CPU cores
- Consider I/O: For I/O-bound tasks, you might want more workers than CPU cores
- Watch memory usage: Each worker consumes resources, so balance parallelism with memory usage
- Handle errors properly: Pass errors through the pipeline rather than handling them in workers
- Prefer bounded concurrency: Use a semaphore to limit the maximum number of concurrent operations
- Use context for cancellation: Implement cancellation using context for long-running operations
Summary#
- The Fan-Out Fan-In pattern distributes work to multiple goroutines and collects the results
- This is ideal for CPU-intensive operations or operations that can be parallelized
- Fan-Out distributes work to multiple workers that read from the same channel
- Fan-In combines results from multiple workers into a single channel
- You can preserve order by tagging items with IDs
- Error handling requires passing errors along with results
- This pattern can significantly improve performance for suitable workloads
In the next lab, we'll learn about cancelling goroutines using the context package, which is important for managing the lifecycle of concurrent operations.