Don’t communicate by sharing memory, share memory by communicating。相信学过Go的同学都知道这句名言,可以说channel就是后边这句话的具体实现。channel是一个类型安全的循环队列,能够控制groutine在它上面读写消息的行为,比如阻塞某个groutine ,或者唤醒某个 groutine。
一个通道相当于一个先进先出(FIFO)的队列,各个元素值都是严格地按照发送的顺序排列的,先被发送通道的元素值一定会先被接收,一个左尖括号紧接着一个减号形象地代表了元素值的传输方向。
下面是创建几种不同的通道:
ch1 := make(chan int) // 无缓冲通道
ch2 := make(chan int, 3) // 有缓冲通道
ch3 := make(chan<- int, 1) // 单向通道:只能发送不能接收
ch4 := make(<-chan int, 1) // 单向通道:只能接收不能发送
下面举一个简单的示例:
func main() {
done := make(chan struct{})
c := make(chan string)
go func() {
s := <-c // 接收消息
println(s)
close(done) // 关闭通道,作为结束通知
}()
c <- "lvmenglou" // 发送消息
<-done // 阻塞,知道有数据或者通道关闭
}
//最后输出:lvmenglou
通道发送和接收操作基本特性:
7.2.1 数据结构
channel的数据结构如下:
type hchan struct {
qcount uint // 当前队列中剩余元素个数
dataqsiz uint // 环形队列长度,即可以存放的元素个数
buf unsafe.Pointer // 环形队列指针
elemsize uint16 // 每个元素的大小
closed uint32 // 标识关闭状态
elemtype *_type // 元素类型
sendx uint // 队列下标,指示元素写入时存放到队列中的位置
recvx uint // 队列下标,指示元素从队列的该位置读出
recvq waitq // 等待读消息的goroutine队列
sendq waitq // 等待写消息的goroutine队列
lock mutex // 互斥锁,chan不允许并发读写
}
chan内部实现了一个环形队列作为其缓冲区,队列的长度是创建chan时指定的,下图展示了一个可缓存6个元素的channel示意图:
下图展示了一个没有缓冲区的channel,有几个goroutine阻塞等待读数据:
7.2.2 发送
向一个channel中写数据简单过程如下:
7.2.3 接收
从一个channel读数据简单过程如下:
7.2.4 关闭
关闭channel时会把recvq中的G全部唤醒,本该写入G的数据位置为nil。把sendq中的G全部唤醒,但这些G会panic。
7.3.1 发送
阻塞情况:
ch := make(chan int, 2)
ch = nil
ch <- 4 // all goroutines are asleep - deadlock!
重要知识点:
ch := make(chan int, 2)
ch <- 4
close(ch)
ch <- 3 // panic: send on closed channel
7.3.2 接收
阻塞情况:
重要知识点:
c := make(chan int, 3)
c <- 11
c <- 12
close(c)
for i := 0; i < cap(c)+1; i++ {
x, ok := <-c
println(i, ":", ok, x)
}
// 输出
// 0: true 11
// 1: true 12
// 2: false 0
// 3: false 0
7.3.3 关闭
重要知识点:
ch := make(chan int, 2)
ch <- 4
close(ch)
close(ch) // panic: close of closed channel
7.3.4 for-range读取
我们常常会用for-range来读取channel的数据
ch := make(chan int, 1)
go func(ch chan int) {
for i := 0; i < 10; i++ {
ch <- i
}
close(ch)
}(ch)
for val := range ch {
fmt.Println(val)
}
重要知识点:
7.3.5 select
select是跟channel关系最亲密的语句,它是被专门设计出来处理通道的,因为每个 case 后面跟的都是通道表达式,可以是读,也可以是写。下面看一个简单的示例:
// 准备好几个通道。
intChannels := [3]chan int{
make(chan int, 1),
make(chan int, 1),
make(chan int, 1),
}
// 随机选择一个通道,并向它发送元素值。
index := rand.Intn(3)
fmt.Printf("The index: %d\n", index)
intChannels[index] <- index
// 哪一个通道中有可取的元素值,哪个对应的分支就会被执行。
select {
case <-intChannels[0]:
fmt.Println("The first candidate case is selected.")
case <-intChannels[1]:
fmt.Println("The second candidate case is selected.")
case elem := <-intChannels[2]:
fmt.Printf("The third candidate case is selected, the element is %d.\n", elem)
default:
fmt.Println("No candidate case is selected!")
}
我们用一个包含了三个候选分支的select语句,分别尝试从上述三个通道中接收元素值,哪一个通道中有值,哪一个对应的候选分支就会被执行。后面还有一个默认分支,不过在这里它是不可能被选中的。
在使用select语句的时候,我们需要注意下面几个事情:
intChan := make(chan int, 1)
// 一秒后关闭通道。
time.AfterFunc(time.Second, func() {
close(intChan)
})
select {
case _, ok := <-intChan:
if !ok {
fmt.Println("The candidate case is closed.")
break
}
fmt.Println("The candidate case is selected.")
}
上面的知识需要牢记,面试常考,下面是讲解select执行的流程:
上面写的有些多,简单总结一下:执行select时,会从左到右,从上到下,对每个case表达式求值,当所有case求值完毕后,会挑选满足的case执行,如果有多条都满足,就随机选择一条;如果都没有满足,就执行default;如果连default都没有,就阻塞住,等有满足条件的case出现时,再执行。
关于channel,零碎的知识点非常多,我还是想通过一个完整的示例,将这些知识点全部串起来,下面就以海外商城Push为例,将上面知识应用到实际场景中。
7.4.1 示例介绍
海外商城需要对W个业务方发送Push,针对每个业务方,为了提高Push的并发能力,采用N个协程从EMQ中读取数据(EMQ中都一个消息队列,里面缓存了大量的Push数据),数据读取后进行处理,然后将处理后的数据写到channel中。同时,服务有M个协程从channel中取出数据并消费,然后通过小米Push SDK,给用户发送Push。整体发送链路如下:
在看后面的内容前,我先抛出几个问题:
7.4.2 初始化
初始化channel数组,数组里面是每个业务方appTypes的channel,channel的缓存区大小为30,并启动10个消费者协程:
var (
messageChan map[string]chan *WorkMessage // channel
stopMasterChan chan bool // 消费者结束通知
appTypes = map[int32]string{1: "shop", 2: "bbs", 3: "sharesave"}
)
func initPushChannel() {
maxSize = 30 // channel缓存区大小
workNum = 10 // goroutine个数
stopMasterChan = make(chan bool)
messageChan = make(map[string]chan *WorkMessage)
for _, name := range appTypes {
workChan := make(chan *WorkMessage, maxSize)
messageChan[name] = workChan
for i := 0; i < workNum; i++ {
go startMaster(name, workChan) // 启动消费者协程
}
}
}
func startMaster(name string, workChan chan *WorkMessage) {
for {
if exit := dostartMaster(name, workChan); exit {
return
}
}
}
初始化EMQ的Client,并启动10个生产者协程:
var (
clientFactory client.ClientFactory // EMQ Client
stopChan chan bool // 生产者结束通知
)
func initEmq() {
// 初始化EMQ的Client和单次读取数据条数,该处代码省略...
maxConsumerNum := 10
stopChan = make(chan bool)
for i := 0; i < maxConsumerNum; i++ {
go receiveMsg(i) // 启动生产者协程
}
}
func receiveMsg(queueID int) {
for {
if exit := doReceiveMsg(queueID); exit {
logz.Info("stop receive msg ...", logz.F("queueID", queueID))
return
}
}
}
主方法调用:
func InitWorker() {
// 初始化push SDK,逻辑省略...
initPushChannel() // 初始化Channel,启动消费者
initEmq() // 启动生产者
}
func doReceiveMsg(queueID int) bool {
defer func() {
if err := recover(); err != nil {
println("[panic] recover from error.")
}
}()
ticker := time.NewTicker(time.Second)
for {
select {
case <-ticker.C:
// 1. 从EMQ获取数据List,逻辑省略...
// 2. 遍历List,获取业务类型,逻辑省略...
// 3. 根据业务类型,获取对应的channel
name := "sharesave" // 示例数据
pushChannel, _ := messageChan[name]
// 4. 构造Push数据,然后放入channel
pushData := &WorkMessage{AppLocal: "id", AppType: 1} // 示例数据
pushChannel <- pushData
case <-stopChan:
println("stop to send data to channel.")
return true
}
}
}
这部分代码我做了大量简化,这里主要做了2件事情:
func dostartMaster(name string, workChan chan *WorkMessage) bool {
defer func() {
if err := recover(); err != nil {
println("[panic] recover from error.")
}
}()
for {
select {
case t := <-workChan:
if t != nil {
for _, message := range t.PushMessages {
// 接受channel数据t,将数据推给Push SDK
// 逻辑省略...
}
}
case <-stopMasterChan:
println("stop to get data from channel.")
return true
}
}
}
这部分代码同样做了大量简化,这里主要做了2件事情:
// 通知生产者协程关闭,协程不再写channel
func stopRecvMsgFromQueue() {
close(stopChan)
}
// 通知消费者协程关闭,协程不再读channel,并关闭channel,消费完channel中剩余消息
func stopPushChannel() {
close(stopMasterChan)
time.Sleep(time.Second)
for _, c := range messageChan {
close(c)
for msg := range c {
if msg != nil {
for _, message := range msg.PushMessages {
// 接受channel数据t,将数据推给Push SDK
// 逻辑省略...
}
}
}
}
}
// 主方法调用
func StopWorker() {
stopRecvMsgFromQueue()
time.Sleep(time.Second * 2)
stopPushChannel()
}
比如服务重启,需要关闭协程时,主要做以下事情:
这里有两个地方sleep了一下,分别有以下作用:
本章基本都是干货,上面总结的比较全面,这里就不再重复了,如果你能回答我提的这些问题,你应该就掌握了本章的内容:
最后就是Push的并发示例,强烈建议大家能掌握,掌握了这个示例,后续你应该也能很容易通过channel实现数据共享,并结合goroutine写出你自己的高并发程序。
Copyright© 2013-2020
All Rights Reserved 京ICP备2023019179号-8