Pipeline Pattern

Part of Golang Mastery course

~15 min read
Interactive
Hands-on
Beginner-friendly

Pipeline Pattern

The pipeline pattern is a powerful concurrency pattern in Go that allows you to process streams or batches of data through a series of stages. Each stage is connected to the next by channels, and each stage can run concurrently with others.

What is a Pipeline?#

A pipeline consists of:

  1. Source or Generator: Produces data for processing
  2. Stages: Each stage does some processing and passes results to the next stage
  3. Sink: The final stage that consumes the fully processed data

The key benefit is that data flows through the pipeline as soon as it's ready, without waiting for all data to be processed by each stage.

Visual Representation#

Here's a visual representation of a pipeline:

Generator Stage 1 Stage 2 Stage 3 | | | | | --data--> | --data--> | --data--> | | | | |

Basic Pipeline Structure#

A pipeline stage typically:

  1. Receives values from an inbound channel
  2. Performs some computation on each received value
  3. Sends the results to an outbound channel

Simple Example: Square Numbers Pipeline#

Let's create a simple pipeline that squares numbers:

example.go
package main
 
import string">"fmt"
 
string">"comment">// generator creates a channel that emits the numbers in nums
func generator(nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
out <- n
}
}()
return out
}
 
string">"comment">// square creates a channel that squares each received number
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
out <- n * n
}
}()
return out
}
 
func main() {
string">"comment">// Set up the pipeline
ch := generator(2, 3)
out := square(ch)
string">"comment">// Consume the output
for n := range out {
fmt.Println(n)
}
string">"comment">// Or set up and consume in one step
/*
for n := range square(generator(2, 3)) {
fmt.Println(n)
}
*/
}
 

This example demonstrates a pipeline with two stages:

  1. generator produces the initial numbers
  2. square squares each number

When you run this program, you'll see:

4 9

Composability of Pipelines#

One of the key benefits of pipelines is that they are composable. You can chain multiple stages together:

example.go
string">"comment">// double creates a channel that multiplies each received number by 2
func double(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
out <- n * 2
}
}()
return out
}
 
string">"comment">// Now we can compose more complex pipelines
for n := range square(square(generator(2, 3))) {
string">"comment">// This will square the numbers, then square the results
fmt.Println(n) string">"comment">// Outputs: 16, 81
}
 
for n := range double(square(generator(2, 3))) {
string">"comment">// This will square the numbers, then double the results
fmt.Println(n) string">"comment">// Outputs: 8, 18
}
 

Multi-stage Pipeline Example#

Let's build a more complex pipeline that filters, maps, and reduces a sequence of numbers:

example.go
package main
 
import string">"fmt"
 
string">"comment">// generator produces numbers
func generator(nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
out <- n
}
}()
return out
}
 
string">"comment">// filter returns only even numbers
func filter(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
if n%2 == 0 {
out <- n
}
}
}()
return out
}
 
string">"comment">// square squares numbers
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
out <- n * n
}
}()
return out
}
 
string">"comment">// sum sums all numbers and returns result
func sum(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
sum := 0
for n := range in {
sum += n
}
out <- sum
}()
return out
}
 
func main() {
string">"comment">// Set up pipeline that:
string">"comment">// 1. Generates numbers 1 through 10
string">"comment">// 2. Filters to keep only even numbers
string">"comment">// 3. Squares the even numbers
string">"comment">// 4. Sums the squared even numbers
string">"comment">// Create the individual stages
numbers := generator(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
evenNumbers := filter(numbers)
squaredNumbers := square(evenNumbers)
sumChannel := sum(squaredNumbers)
string">"comment">// Get the result
fmt.Println(<-sumChannel) string">"comment">// Outputs: 220 (2² + 4² + 6² + 8² + 10²)
string">"comment">// Or, compose the pipeline in one line
result := <-sum(square(filter(generator(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))))
fmt.Println(result) string">"comment">// Also outputs: 220
}
 

Fan-Out, Fan-In Pattern#

To process items in parallel, you can "fan out" the work to multiple goroutines, then "fan in" the results:

example.go
package main
 
import (
string">"fmt"
string">"sync"
)
 
string">"comment">// Generator function from earlier examples
func generator(nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
out <- n
}
}()
return out
}
 
string">"comment">// slowSquare is like square but has artificial delay
func slowSquare(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
string">"comment">// Simulate time-consuming operation
time.Sleep(100 * time.Millisecond)
out <- n * n
}
}()
return out
}
 
string">"comment">// fanOut runs multiple instances of slowSquare in parallel
func fanOut(in <-chan int, numWorkers int) []<-chan int {
outputs := make([]<-chan int, numWorkers)
for i := 0; i < numWorkers; i++ {
outputs[i] = slowSquare(in)
}
return outputs
}
 
string">"comment">// fanIn merges multiple channels into one
func fanIn(channels ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
string">"comment">// Forward values from each input channel to the output channel
output := func(ch <-chan int) {
defer wg.Done()
for n := range ch {
out <- n
}
}
wg.Add(len(channels))
for _, ch := range channels {
go output(ch)
}
string">"comment">// Close output channel when all input channels are done
go func() {
wg.Wait()
close(out)
}()
return out
}
 
func main() {
string">"comment">// Generate some numbers
in := generator(2, 3, 4, 5, 6, 7, 8, 9)
string">"comment">// Fan out to 3 workers
workers := fanOut(in, 3)
string">"comment">// Fan in the results
out := fanIn(workers...)
string">"comment">// Collect results
var results []int
for n := range out {
results = append(results, n)
}
fmt.Println(results)
}
 

This pattern is particularly useful for CPU-intensive operations, where running multiple workers in parallel can significantly improve performance.

Stopping a Pipeline#

There are several ways to stop a pipeline:

1. Natural Completion#

The simplest way is to let the pipeline complete naturally. When a stage finishes processing all its input data, it closes its output channel, which signals to the next stage that no more data is coming.

2. Using Context for Cancellation#

For long-running or infinite pipelines, you can use the context package to signal cancellation:

example.go
package main
 
import (
string">"context"
string">"fmt"
string">"time"
)
 
func generator(ctx context.Context) <-chan int {
out := make(chan int)
go func() {
defer close(out)
n := 0
for {
select {
case <-ctx.Done():
string">"comment">// Context cancelled, stop generation
return
case out <- n:
string">"comment">// Successfully sent, increment counter
n++
string">"comment">// Simulate some work
time.Sleep(100 * time.Millisecond)
}
}
}()
return out
}
 
func square(ctx context.Context, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for {
select {
case <-ctx.Done():
string">"comment">// Context cancelled, stop processing
return
case n, ok := <-in:
if !ok {
string">"comment">// Input channel closed, we're done
return
}
select {
case out <- n * n:
string">"comment">// Successfully sent squared value
case <-ctx.Done():
string">"comment">// Context cancelled
return
}
}
}
}()
return out
}
 
func main() {
string">"comment">// Create a context with cancellation
ctx, cancel := context.WithCancel(context.Background())
string">"comment">// Set up a pipeline that generates and squares numbers
numbers := generator(ctx)
squares := square(ctx, numbers)
string">"comment">// Process 5 values, then cancel
for i := 0; i < 5; i++ {
fmt.Println(<-squares)
}
string">"comment">// Cancel the context to stop the pipeline
cancel()
string">"comment">// Give the goroutines a moment to clean up
time.Sleep(time.Second)
fmt.Println(string">"Pipeline stopped")
}
 

Best Practices#

  1. Each stage should have its own goroutine: This allows all stages to run concurrently
  2. Stages should not know about other stages: They should only be concerned with their input and output channels
  3. The stage that creates a channel should be responsible for closing it: Follow the channel ownership pattern
  4. Handle cancellation: Provide ways to stop the pipeline early if needed
  5. Stream data, don't batch: Process data as soon as it's available rather than waiting for all data
  6. Consider buffered channels: For stages with different processing rates, buffered channels can improve throughput
  7. Handle errors: Pass errors through the pipeline so they can be handled appropriately

Summary#

  • The pipeline pattern processes data through a series of stages
  • Each stage runs in its own goroutine and communicates via channels
  • Pipelines are composable - you can combine them in different ways
  • Fan-out, fan-in patterns can be used for parallel processing
  • Context can be used to handle cancellation
  • Pipelines enable efficient processing of data streams

In the next lab, we'll explore the fan-out, fan-in pattern in more detail, which builds on the pipeline pattern to enable parallel processing.

Your Progress

17 of 19 modules
89%
Started89% Complete
Previous
SpaceComplete
Next