Go 并发模式实战
并发与并行的区别
在深入之前,先明确两个关键概念:
- 并发(Concurrency):同时处理多个任务的能力,关注的是结构
- 并行(Parallelism):同时执行多个任务,关注的是物理执行
Go 的并发模型建立在 goroutine 和 channel 之上,灵感来自 CSP(Communicating Sequential Processes)。
Goroutine 基础
go
func main() {
go sayHello() // 启动一个新的 goroutine
time.Sleep(1 * time.Second)
}
func sayHello() {
fmt.Println("Hello from goroutine!")
}go 关键字启动的 goroutine 是轻量级线程,初始栈只有几 KB,可以创建数十万个。
Channel 通信模式
1. 无缓冲 Channel(同步)
go
ch := make(chan string)
go func() {
ch <- "ping" // 阻塞直到有接收方
}()
msg := <-ch // 阻塞直到有发送方
fmt.Println(msg) // "ping"2. 带缓冲 Channel(异步)
go
ch := make(chan string, 3)
ch <- "a"
ch <- "b"
ch <- "c"
// ch <- "d" // 会阻塞,因为缓冲区满了3. Select 多路复用
go
select {
case msg := <-ch1:
fmt.Println("from ch1:", msg)
case msg := <-ch2:
fmt.Println("from ch2:", msg)
case <-time.After(1 * time.Second):
fmt.Println("timeout")
}常用并发模式
Fan-Out / Fan-In
go
func fanOut(input <-chan int, workers int) []<-chan int {
channels := make([]<-chan int, workers)
for i := 0; i < workers; i++ {
channels[i] = worker(input)
}
return channels
}
func fanIn(channels ...<-chan int) <-chan int {
out := make(chan int)
var wg sync.WaitGroup
for _, ch := range channels {
wg.Add(1)
go func(c <-chan int) {
defer wg.Done()
for v := range c {
out <- v
}
}(ch)
}
go func() {
wg.Wait()
close(out)
}()
return out
}Pipeline 模式
go
func gen(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
func sq(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}性能对比
| 模式 | 适用场景 | 复杂度 | 内存开销 |
|---|---|---|---|
| Mutex | 简单共享状态 | 低 | 极低 |
| Channel | 通信协调 | 中 | 低 |
| Fan-Out | CPU 密集 | 中 | 中 |
| Pipeline | 流式处理 | 高 | 中 |
最佳实践
不要通过共享内存来通信,而要通过通信来共享内存。 —— Go 并发哲学
这条原则是理解 Go 并发模型的关键。在实际项目中优先考虑 channel 通信,只在确实需要时才使用互斥锁。