Pipeline Pattern
The pipeline pattern is a powerful concurrency pattern in Go that allows you to process streams or batches of data through a series of stages. Each stage is connected to the next by channels, and each stage can run concurrently with others.
What is a Pipeline?#
A pipeline consists of:
- Source or Generator: Produces data for processing
- Stages: Each stage does some processing and passes results to the next stage
- Sink: The final stage that consumes the fully processed data
The key benefit is that data flows through the pipeline as soon as it's ready, without waiting for all data to be processed by each stage.
Visual Representation#
Here's a visual representation of a pipeline:
Generator Stage 1 Stage 2 Stage 3
| | | |
| --data--> | --data--> | --data--> |
| | | |
Basic Pipeline Structure#
A pipeline stage typically:
- Receives values from an inbound channel
- Performs some computation on each received value
- Sends the results to an outbound channel
Simple Example: Square Numbers Pipeline#
Let's create a simple pipeline that squares numbers:
package mainimport string">"fmt"string">"comment">// generator creates a channel that emits the numbers in numsfunc generator(nums ...int) <-chan int {out := make(chan int)go func() {defer close(out)for _, n := range nums {out <- n}}()return out}string">"comment">// square creates a channel that squares each received numberfunc square(in <-chan int) <-chan int {out := make(chan int)go func() {defer close(out)for n := range in {out <- n * n}}()return out}func main() {string">"comment">// Set up the pipelinech := generator(2, 3)out := square(ch)string">"comment">// Consume the outputfor n := range out {fmt.Println(n)}string">"comment">// Or set up and consume in one step/*for n := range square(generator(2, 3)) {fmt.Println(n)}*/}
This example demonstrates a pipeline with two stages:
generatorproduces the initial numberssquaresquares each number
When you run this program, you'll see:
4
9
Composability of Pipelines#
One of the key benefits of pipelines is that they are composable. You can chain multiple stages together:
string">"comment">// double creates a channel that multiplies each received number by 2func double(in <-chan int) <-chan int {out := make(chan int)go func() {defer close(out)for n := range in {out <- n * 2}}()return out}string">"comment">// Now we can compose more complex pipelinesfor n := range square(square(generator(2, 3))) {string">"comment">// This will square the numbers, then square the resultsfmt.Println(n) string">"comment">// Outputs: 16, 81}for n := range double(square(generator(2, 3))) {string">"comment">// This will square the numbers, then double the resultsfmt.Println(n) string">"comment">// Outputs: 8, 18}
Multi-stage Pipeline Example#
Let's build a more complex pipeline that filters, maps, and reduces a sequence of numbers:
package mainimport string">"fmt"string">"comment">// generator produces numbersfunc generator(nums ...int) <-chan int {out := make(chan int)go func() {defer close(out)for _, n := range nums {out <- n}}()return out}string">"comment">// filter returns only even numbersfunc filter(in <-chan int) <-chan int {out := make(chan int)go func() {defer close(out)for n := range in {if n%2 == 0 {out <- n}}}()return out}string">"comment">// square squares numbersfunc square(in <-chan int) <-chan int {out := make(chan int)go func() {defer close(out)for n := range in {out <- n * n}}()return out}string">"comment">// sum sums all numbers and returns resultfunc sum(in <-chan int) <-chan int {out := make(chan int)go func() {defer close(out)sum := 0for n := range in {sum += n}out <- sum}()return out}func main() {string">"comment">// Set up pipeline that:string">"comment">// 1. Generates numbers 1 through 10string">"comment">// 2. Filters to keep only even numbersstring">"comment">// 3. Squares the even numbersstring">"comment">// 4. Sums the squared even numbersstring">"comment">// Create the individual stagesnumbers := generator(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)evenNumbers := filter(numbers)squaredNumbers := square(evenNumbers)sumChannel := sum(squaredNumbers)string">"comment">// Get the resultfmt.Println(<-sumChannel) string">"comment">// Outputs: 220 (2² + 4² + 6² + 8² + 10²)string">"comment">// Or, compose the pipeline in one lineresult := <-sum(square(filter(generator(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))))fmt.Println(result) string">"comment">// Also outputs: 220}
Fan-Out, Fan-In Pattern#
To process items in parallel, you can "fan out" the work to multiple goroutines, then "fan in" the results:
package mainimport (string">"fmt"string">"sync")string">"comment">// Generator function from earlier examplesfunc generator(nums ...int) <-chan int {out := make(chan int)go func() {defer close(out)for _, n := range nums {out <- n}}()return out}string">"comment">// slowSquare is like square but has artificial delayfunc slowSquare(in <-chan int) <-chan int {out := make(chan int)go func() {defer close(out)for n := range in {string">"comment">// Simulate time-consuming operationtime.Sleep(100 * time.Millisecond)out <- n * n}}()return out}string">"comment">// fanOut runs multiple instances of slowSquare in parallelfunc fanOut(in <-chan int, numWorkers int) []<-chan int {outputs := make([]<-chan int, numWorkers)for i := 0; i < numWorkers; i++ {outputs[i] = slowSquare(in)}return outputs}string">"comment">// fanIn merges multiple channels into onefunc fanIn(channels ...<-chan int) <-chan int {var wg sync.WaitGroupout := make(chan int)string">"comment">// Forward values from each input channel to the output channeloutput := func(ch <-chan int) {defer wg.Done()for n := range ch {out <- n}}wg.Add(len(channels))for _, ch := range channels {go output(ch)}string">"comment">// Close output channel when all input channels are donego func() {wg.Wait()close(out)}()return out}func main() {string">"comment">// Generate some numbersin := generator(2, 3, 4, 5, 6, 7, 8, 9)string">"comment">// Fan out to 3 workersworkers := fanOut(in, 3)string">"comment">// Fan in the resultsout := fanIn(workers...)string">"comment">// Collect resultsvar results []intfor n := range out {results = append(results, n)}fmt.Println(results)}
This pattern is particularly useful for CPU-intensive operations, where running multiple workers in parallel can significantly improve performance.
Stopping a Pipeline#
There are several ways to stop a pipeline:
1. Natural Completion#
The simplest way is to let the pipeline complete naturally. When a stage finishes processing all its input data, it closes its output channel, which signals to the next stage that no more data is coming.
2. Using Context for Cancellation#
For long-running or infinite pipelines, you can use the context package to signal cancellation:
package mainimport (string">"context"string">"fmt"string">"time")func generator(ctx context.Context) <-chan int {out := make(chan int)go func() {defer close(out)n := 0for {select {case <-ctx.Done():string">"comment">// Context cancelled, stop generationreturncase out <- n:string">"comment">// Successfully sent, increment countern++string">"comment">// Simulate some worktime.Sleep(100 * time.Millisecond)}}}()return out}func square(ctx context.Context, in <-chan int) <-chan int {out := make(chan int)go func() {defer close(out)for {select {case <-ctx.Done():string">"comment">// Context cancelled, stop processingreturncase n, ok := <-in:if !ok {string">"comment">// Input channel closed, we're donereturn}select {case out <- n * n:string">"comment">// Successfully sent squared valuecase <-ctx.Done():string">"comment">// Context cancelledreturn}}}}()return out}func main() {string">"comment">// Create a context with cancellationctx, cancel := context.WithCancel(context.Background())string">"comment">// Set up a pipeline that generates and squares numbersnumbers := generator(ctx)squares := square(ctx, numbers)string">"comment">// Process 5 values, then cancelfor i := 0; i < 5; i++ {fmt.Println(<-squares)}string">"comment">// Cancel the context to stop the pipelinecancel()string">"comment">// Give the goroutines a moment to clean uptime.Sleep(time.Second)fmt.Println(string">"Pipeline stopped")}
Best Practices#
- Each stage should have its own goroutine: This allows all stages to run concurrently
- Stages should not know about other stages: They should only be concerned with their input and output channels
- The stage that creates a channel should be responsible for closing it: Follow the channel ownership pattern
- Handle cancellation: Provide ways to stop the pipeline early if needed
- Stream data, don't batch: Process data as soon as it's available rather than waiting for all data
- Consider buffered channels: For stages with different processing rates, buffered channels can improve throughput
- Handle errors: Pass errors through the pipeline so they can be handled appropriately
Summary#
- The pipeline pattern processes data through a series of stages
- Each stage runs in its own goroutine and communicates via channels
- Pipelines are composable - you can combine them in different ways
- Fan-out, fan-in patterns can be used for parallel processing
- Context can be used to handle cancellation
- Pipelines enable efficient processing of data streams
In the next lab, we'll explore the fan-out, fan-in pattern in more detail, which builds on the pipeline pattern to enable parallel processing.