VOOZH about

URL: https://dzone.com/articles/understanding-concurrency-patterns-in-go

โ‡ฑ Understanding Concurrency Patterns in Go


Related

  1. DZone
  2. Coding
  3. Languages
  4. Understanding Concurrency Patterns in Go

Understanding Concurrency Patterns in Go

Explore Goโ€™s concurrency patterns like worker pools, fan-out/fan-in, and pipelines for task management and data processing.

Likes
Comment
Save
6.7K Views

Join the DZone community and get the full member experience.

Join For Free

Go, also known as Golang, has become a popular language for developing concurrent systems due to its simple yet powerful concurrency model. Concurrency is a first-class citizen in Go, making it easier to write programs that efficiently use multicore processors. This article explores essential concurrency patterns in Go, demonstrating how to leverage goroutines and channels to build efficient and maintainable concurrent applications.

The Basics of Concurrency in Go

Goroutines

A goroutine is a lightweight thread managed by the Go runtime. Goroutines are cheap to create and have a small memory footprint, allowing you to run thousands of them concurrently.

Go
package main

import (
 "fmt"
 "time"
)

func sayHello() {
 fmt.Println("Hello, Go!")
}

func main() {
 go sayHello() // Start a new goroutine
 time.Sleep(1 * time.Second) // Wait for the goroutine to finish
}


Channels

Channels are Go's way of allowing goroutines to communicate with each other and synchronize their execution. You can send values from one goroutine to another through channels.

Go
package main

import "fmt"

func main() {
 ch := make(chan string)

 go func() {
 ch <- "Hello from goroutine"
 }()

 msg := <-ch
 fmt.Println(msg)
}

Don't communicate by sharing memory; share memory by communicating. (R. Pike)

Common Concurrency Patterns

Worker Pool

Purpose

To manage a fixed number of worker units (goroutines) that handle a potentially large number of tasks, optimizing resource usage and processing efficiency.

Use Cases

  • Task processing: Handling a large number of tasks (e.g., file processing, web requests) with a controlled number of worker threads to avoid overwhelming the system.
  • Concurrency management: Limiting the number of concurrent operations to prevent excessive resource consumption.
  • Job scheduling: Distributing and balancing workloads across a set of worker threads to maintain efficient processing.

Example

Go
package main

import (
 "fmt"
 "sync"
 "time"
)

// Worker function processes jobs from the jobs channel and sends results to the results channel
func worker(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
 defer wg.Done()
 for job := range jobs {
 // Simulate processing the job
 fmt.Printf("Worker %d processing job %d\n", id, job)
 time.Sleep(time.Second) // Simulate a time-consuming task
 results <- job * 2
 }
}

func main() {
 const numJobs = 15
 const numWorkers = 3

 jobs := make(chan int, numJobs)
 results := make(chan int, numJobs)

 var wg sync.WaitGroup

 // Start workers
 for w := 1; w <= numWorkers; w++ {
 wg.Add(1)
 go worker(w, jobs, results, &wg)
 }

 // Send jobs to the jobs channel
 for j := 1; j <= numJobs; j++ {
 jobs <- j
 }
 close(jobs)

 // Wait for all workers to finish
 go func() {
 wg.Wait()
 close(results)
 }()

 // Collect and print results
 for result := range results {
 fmt.Println("Result:", result)
 }
}


Fan-In

Purpose

To merge multiple input channels or data streams into a single output channel, consolidating results from various sources.

Use Cases

  • Log aggregation: Combining log entries from multiple sources into a single logging system for centralized analysis.
  • Data merging: Aggregating data from various producers into a single stream for further processing or analysis.
  • Event collection: Collecting events from multiple sources into one channel for unified handling.

Example

Go
package main

import (
 "fmt"
)

// Function to merge multiple channels into one
func merge(channels ...<-chan int) <-chan int {
 var wg sync.WaitGroup
 merged := make(chan int)

 output := func(c <-chan int) {
 defer wg.Done()
 for n := range c {
 merged <- n
 }
 }

 wg.Add(len(channels))
 for _, c := range channels {
 go output(c)
 }

 go func() {
 wg.Wait()
 close(merged)
 }()
 
 return merged
}

func worker(id int, jobs <-chan int) <-chan int {
 results := make(chan int)
 go func() {
 defer close(results)
 for job := range jobs {
 // Simulate processing
 fmt.Printf("Worker %d processing job %d\n", id, job)
 results <- job * 2
 }
 }()
 return results
}

func main() {
 const numJobs = 5
 jobs := make(chan int, numJobs)

 // Start workers and collect their result channels
 workerChannels := make([]<-chan int, 0, 3)
 for w := 1; w <= 3; w++ {
 workerChannels = append(workerChannels, worker(w, jobs))
 }

 // Send jobs
 for j := 1; j <= numJobs; j++ {
 jobs <- j
 }
 close(jobs)

 // Merge results
 results := merge(workerChannels...)

 // Collect and print results
 for result := range results {
 fmt.Println("Result:", result)
 }
}


Fan-Out

Purpose

To distribute data or messages from a single source to multiple consumers, allowing each consumer to process the same data independently.

Use Cases

  • Broadcasting notifications: Sending notifications or updates to multiple subscribers or services simultaneously.
  • Data distribution: Delivering data to multiple components or services that each needs to process or act upon the same information.
  • Event handling: Emitting events to various handlers that perform different actions based on the event.

Example

Go
package main

import (
 "fmt"
 "sync"
 "time"
)

// Subscriber function simulates a subscriber receiving a notification
func subscriber(id int, notification string, wg *sync.WaitGroup) {
 defer wg.Done()
 // Simulate processing the notification
 time.Sleep(time.Millisecond * 100) // Simulate some delay
 fmt.Printf("Subscriber %d received notification: %s\n", id, notification)
}

func main() {
 // List of subscribers (represented by IDs)
 subscribers := []int{1, 2, 3, 4, 5}
 notification := "Important update available!"

 var wg sync.WaitGroup

 // Broadcast notification to all subscribers concurrently
 for _, sub := range subscribers {
 wg.Add(1)
 go subscriber(sub, notification, &wg)
 }

 // Wait for all subscribers to receive the notification
 wg.Wait()

 fmt.Println("All subscribers have received the notification.")
}


Generator

Purpose

To produce a sequence of data or events that can be consumed by other parts of a system.

Use Cases

  • Data streams: Generating a stream of data items, such as log entries or sensor readings, that are processed by other components.
  • Event emission: Emitting a series of events or notifications to be handled by event listeners or subscribers.
  • Data simulation: Creating simulated data for testing or demonstration purposes.

Example

Go
package main

import (
 "fmt"
 "time"
)

// Generator function that produces integers
func generator(start, end int) <-chan int {
 out := make(chan int)
 go func() {
 for i := start; i <= end; i++ {
 out <- i
 }
 close(out)
 }()
 return out
}

func main() {
 // Start the generator
 gen := generator(1, 10)

 // Consume the generated values
 for value := range gen {
 fmt.Println("Received:", value)
 }
}


Pipeline

Purpose

To process data through a series of stages, where each stage transforms or processes the data before passing it to the next stage.

Use Cases

  • Data transformation: Applying a sequence of transformations to data, such as filtering, mapping, and reducing.
  • Stream processing: Handling data streams in a step-by-step manner, where each step performs a specific operation on the data.
  • Complex processing workflows: Breaking down complex processing tasks into manageable stages, such as data ingestion, transformation, and output.

Example

Go
package main

import (
 "fmt"
)

func generator(nums ...int) <-chan int {
 out := make(chan int)
 go func() {
 for _, n := range nums {
 out <- n
 }
 close(out)
 }()
 return out
}

func sq(in <-chan int) <-chan int {
 out := make(chan int)
 go func() {
 for n := range in {
 out <- n * n
 }
 close(out)
 }()
 return out
}

func main() {
 c := generator(2, 3, 4)
 out := sq(c)

 for n := range out {
 fmt.Println(n)
 }
}


Conclusion

Understanding and utilizing concurrency patterns in Go can significantly enhance the performance and efficiency of your applications. The language's built-in support for goroutines and channels simplifies the process of managing concurrent execution, making it an excellent choice for developing high-performance systems.

You can fully utilize Go's concurrency model to build robust, scalable applications by mastering these patterns.

Go (programming language)

Published at DZone with permission of Suleiman Dibirov. See the original article here.

Opinions expressed by DZone contributors are their own.

Related

  • Event-Driven Pipelines With Apache Pulsar and Go
  • Clean Code: Concurrency Patterns, Context Management, and Goroutine Safety, Part 5
  • Clean Code: Package Architecture, Dependency Flow, and Scalability, Part 4
  • Clean Code: Interfaces in Go โ€” Why Small Is Beautiful, Part 3

Partner Resources

ร—

Comments

The likes didn't load as expected. Please refresh the page and try again.

Let's be friends: