Go并发-使用协程、通道和select

通道和阻塞

通道(channel)和协程(goroutine)是实现Go并发程序的两种机制。其中,通道又分为无缓冲通道和有缓冲通道两种,在编写实际的并发程序时,基本都使用异步模式的有缓冲通道。通道又可细分为支持读和写的双向通道,只读的通道,只写的通道三种。

通道阻塞场景

无论是有缓存通道、还是无缓冲通道都存在阻塞的情况。阻塞场景共4个,有缓存和无缓冲各2个。

无缓冲通道的特点是,发送的数据需要被读取后发送才会完成(同步),它阻塞场景是:

  • 通道中无数据,但执行读通道。
  • 通道中无数据,向通道写数据,但无协程读取。

有缓存通道的特点是,有缓存时可以向通道中写入数据后直接返回(异步),它阻塞场景是:

  • 通道缓存无数据,但执行读通道(接收数据)。
  • 通道缓存已经占满,向通道写数据(发送数据),但无协程读。

使用协程、通道和select

Go的select关键字可以让我们操作多个通道,将协程(goroutine),通道(channel)和select结合起来构成了Go的一个强大特性。

首先,我们来看一个简短代码,如下所示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package main

import (
"time"
"fmt"
)

func main() {
c1 := make(chan string, 1) //定义两个有缓冲通道,容量为1
c2 := make(chan string, 1)

go func() {
time.Sleep(time.Second * 1) //每隔1秒发送数据
c1 <- "name: xuchao"
}()

go func() {
time.Sleep(time.Second * 2) //每隔2秒发送数据
c2 <- "age: 25"
}()

for i:=0; i<2; i++ { //使用select来等待这两个通道的值,然后输出
select {
case msg1 := <- c1:
fmt.Println(msg1)
case msg2 := <- c2:
fmt.Println(msg2)
}
}
}

执行结果

1
2
3
# go run channel.go 
name: xuchao
age: 25

如我们所期望的,程序输出了正确的值。对于select语句而言,它不断地检测通道是否有值过来,一旦发现有值过来,立刻获取输出。

使用Select+超时实现无阻塞读写

select是执行选择操作的一个结构,它里面有一组case语句,它会执行其中无阻塞的那一个,如果都阻塞了,那就等待其中一个不阻塞,进而继续执行,它有一个default语句,该语句是永远不会阻塞的,我们可以借助它实现无阻塞的操作。

但使用default实现的无阻塞通道阻塞有一个缺陷:当通道不可读或写的时候,会即可返回。实际场景更多的需求是,我们希望尝试读一会数据,或者尝试写一会数据,如果实在没法读写再返回,程序继续做其它的事情。

使用定时器替代default可以解决这个问题,给通道增加读写数据的容忍时间,如果5s内无法读写,就即刻返回。示例代码修改一下会是这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package main

import (
"fmt"
"time"
)

func main() {
c1 := make(chan string, 1) //定义两个有缓冲通道,容量分别为1
c2 := make(chan string, 1)

go func() { //定义一个协程
time.Sleep(time.Second * 1) //隔1秒发送数据
c1 <- "name: xuchao" //向c1通道发送数据
}()

go func() {
time.Sleep(time.Second * 6) //隔6秒发送数据
c2 <- "age: 25" //向c2通道发送数据
}()

for i := 0; i < 2; i++ { //使用select来获取这两个通道的值,然后输出
tm := time.NewTimer(time.Second * 5) //给通道创建容忍时间,如果5s内无法读写,就即刻返回
select {
case msg1 := <-c1: //接收c1通道数据(消费数据)
fmt.Println(msg1)
case msg2 := <-c2: //接收c2通道数据(消费数据)
fmt.Println(msg2)
case <-tm.C:
fmt.Println("send data timeout!")
}
}
}

结果就会变成超时返回:

1
2
3
# go run channel2.go 
name: xuchao
send data timeout!

总结

为什么Go语言对通道要限制长度而不提供无限长度的通道?
我们知道通道(channel)是在两个 goroutine间通信的桥梁。当数据提供方供给速度大于消费方数据处理速度时,如果通道不限制长度,那么内存将不断膨胀直到应用崩溃。因此,限制通道的长度有利于约束数据提供方的供给速度,才能正常地处理数据。

常用的select场景

  • 无阻塞的读、写通道。即使通道是带缓存的,也是存在阻塞的情况,使用select可以完美的解决阻塞读写。
  • 给某个请求/处理/操作,设置超时时间,一旦超时时间内无法完成,则停止处理。
  • select本色:多通道处理。

解决阻塞的2种办法

  • 使用select的default语句,在channel不可读写时,即可返回
  • 使用select+定时器,在超时时间内,channel不可读写,则返回(推荐方式)

记住,在for循环里不要使用select + time.After的组合,易造成内存泄漏,应当使用NewTimer来做定时器。当使用golang过程中,遇到性能和内存gc问题,都可以使用golang tool pprof来排查分析问题。

希望这篇文章对你开发Go并发程序有所启发。

0%