Go语言核心手册-7.通道

337次阅读  |  发布于2年以前

Don’t communicate by sharing memory, share memory by communicating。相信学过Go的同学都知道这句名言,可以说channel就是后边这句话的具体实现。channel是一个类型安全的循环队列,能够控制groutine在它上面读写消息的行为,比如阻塞某个groutine ,或者唤醒某个 groutine。

7.1 基本特征

一个通道相当于一个先进先出(FIFO)的队列,各个元素值都是严格地按照发送的顺序排列的,先被发送通道的元素值一定会先被接收,一个左尖括号紧接着一个减号形象地代表了元素值的传输方向。

下面是创建几种不同的通道:


ch1 := make(chan int)      // 无缓冲通道
ch2 := make(chan int, 3)   // 有缓冲通道
ch3 := make(chan<- int, 1) // 单向通道:只能发送不能接收
ch4 := make(<-chan int, 1) // 单向通道:只能接收不能发送

下面举一个简单的示例:


func main() {
    done := make(chan struct{})
    c := make(chan string)
    go func() {
        s := <-c     // 接收消息
        println(s)
        close(done)  // 关闭通道,作为结束通知
    }()
    c <- "lvmenglou" // 发送消息
    <-done           // 阻塞,知道有数据或者通道关闭
}
//最后输出:lvmenglou

通道发送和接收操作基本特性:

7.2 底层原理

7.2.1 数据结构

channel的数据结构如下:

type hchan struct { 
    qcount uint // 当前队列中剩余元素个数 
    dataqsiz uint // 环形队列长度,即可以存放的元素个数 
    buf unsafe.Pointer // 环形队列指针 
    elemsize uint16 // 每个元素的大小 
    closed uint32 // 标识关闭状态 
    elemtype *_type // 元素类型 
    sendx uint // 队列下标,指示元素写入时存放到队列中的位置 
    recvx uint // 队列下标,指示元素从队列的该位置读出 
    recvq waitq // 等待读消息的goroutine队列 
    sendq waitq // 等待写消息的goroutine队列 
    lock mutex // 互斥锁,chan不允许并发读写 
}

chan内部实现了一个环形队列作为其缓冲区,队列的长度是创建chan时指定的,下图展示了一个可缓存6个元素的channel示意图:

下图展示了一个没有缓冲区的channel,有几个goroutine阻塞等待读数据:

7.2.2 发送

向一个channel中写数据简单过程如下:

7.2.3 接收

从一个channel读数据简单过程如下:

7.2.4 关闭

关闭channel时会把recvq中的G全部唤醒,本该写入G的数据位置为nil。把sendq中的G全部唤醒,但这些G会panic。

7.3 核心知识

7.3.1 发送

阻塞情况:


ch := make(chan int, 2)
ch = nil
ch <- 4  // all goroutines are asleep - deadlock!

重要知识点:

ch := make(chan int, 2)
ch <- 4
close(ch)
ch <- 3 // panic: send on closed channel

7.3.2 接收

阻塞情况:

重要知识点:


c := make(chan int, 3)
c <- 11
c <- 12
close(c)
for i := 0; i < cap(c)+1; i++ {
    x, ok := <-c
    println(i, ":", ok, x)
}
// 输出
// 0: true 11
// 1: true 12
// 2: false 0
// 3: false 0

7.3.3 关闭

重要知识点:

ch := make(chan int, 2)
ch <- 4
close(ch)
close(ch) // panic: close of closed channel

7.3.4 for-range读取

我们常常会用for-range来读取channel的数据


ch := make(chan int, 1)
go func(ch chan int) {
    for i := 0; i < 10; i++ {
        ch <- i
    }
    close(ch)
}(ch)
for val := range ch {
    fmt.Println(val)
}

重要知识点:

7.3.5 select

select是跟channel关系最亲密的语句,它是被专门设计出来处理通道的,因为每个 case 后面跟的都是通道表达式,可以是读,也可以是写。下面看一个简单的示例:


// 准备好几个通道。
intChannels := [3]chan int{
  make(chan int, 1),
  make(chan int, 1),
  make(chan int, 1),
}
// 随机选择一个通道,并向它发送元素值。
index := rand.Intn(3)
fmt.Printf("The index: %d\n", index)
intChannels[index] <- index
// 哪一个通道中有可取的元素值,哪个对应的分支就会被执行。
select {
case <-intChannels[0]:
  fmt.Println("The first candidate case is selected.")
case <-intChannels[1]:
  fmt.Println("The second candidate case is selected.")
case elem := <-intChannels[2]:
  fmt.Printf("The third candidate case is selected, the element is %d.\n", elem)
default:
  fmt.Println("No candidate case is selected!")
}

我们用一个包含了三个候选分支的select语句,分别尝试从上述三个通道中接收元素值,哪一个通道中有值,哪一个对应的候选分支就会被执行。后面还有一个默认分支,不过在这里它是不可能被选中的。

在使用select语句的时候,我们需要注意下面几个事情:

intChan := make(chan int, 1)
// 一秒后关闭通道。
time.AfterFunc(time.Second, func() {
  close(intChan)
})
select {
case _, ok := <-intChan:
  if !ok {
    fmt.Println("The candidate case is closed.")
    break
  }
  fmt.Println("The candidate case is selected.")
}

上面的知识需要牢记,面试常考,下面是讲解select执行的流程:

  1. 对于每一个case表达式,都至少会包含一个代表发送操作的发送表达式或者一个代表接收操作的接收表达式,同时也可能会包含其他的表达式。比如,如果case表达式是包含了接收表达式的短变量声明时,那么在赋值符号左边的就可以是一个或两个表达式,不过此处的表达式的结果必须是可以被赋值的。当这样的case表达式被求值时,它包含的多个表达式总会以从左到右的顺序被求值。
  2. select语句包含的候选分支中的case表达式都会在该语句执行开始时先被求值,并且求值的顺序是依从代码编写的顺序从上到下的。结合上一条规则,在select语句开始执行时,排在最上边的候选分支中最左边的表达式会最先被求值,然后是它右边的表达式。仅当最上边的候选分支中的所有表达式都被求值完毕后,从上边数第二个候选分支中的表达式才会被求值,顺序同样是从左到右,然后是第三个候选分支、第四个候选分支,以此类推。
  3. 对于每一个case表达式,如果其中的发送表达式或者接收表达式在被求值时,相应的操作正处于阻塞状态,那么对该case表达式的求值就是不成功的。在这种情况下,我们可以说,这个case表达式所在的候选分支是不满足选择条件的。
  4. 仅当select语句中的所有case表达式都被求值完毕后,它才会开始选择候选分支。这时候,它只会挑选满足选择条件的候选分支执行。如果所有的候选分支都不满足选择条件,那么默认分支就会被执行。如果这时没有默认分支,那么select语句就会立即进入阻塞状态,直到至少有一个候选分支满足选择条件为止。一旦有一个候选分支满足选择条件,select语句(或者说它所在的goroutine)就会被唤醒,这个候选分支就会被执行。
  5. 如果select语句发现同时有多个候选分支满足选择条件,那么它就会用一种伪随机的算法在这些分支中选择一个并执行。注意,即使select语句是在被唤醒时发现的这种情况,也会这样做。
  6. 一条select语句中只能够有一个默认分支。并且,默认分支只在无候选分支可选时才会被执行,这与它的编写位置无关。
  7. select语句的每次执行,包括case表达式求值和分支选择,都是独立的。不过,至于它的执行是否是并发安全的,就要看其中的case表达式以及分支中,是否包含并发不安全的代码了。

上面写的有些多,简单总结一下:执行select时,会从左到右,从上到下,对每个case表达式求值,当所有case求值完毕后,会挑选满足的case执行,如果有多条都满足,就随机选择一条;如果都没有满足,就执行default;如果连default都没有,就阻塞住,等有满足条件的case出现时,再执行。

7.4 并发实例:海外商城Push

关于channel,零碎的知识点非常多,我还是想通过一个完整的示例,将这些知识点全部串起来,下面就以海外商城Push为例,将上面知识应用到实际场景中。

7.4.1 示例介绍

海外商城需要对W个业务方发送Push,针对每个业务方,为了提高Push的并发能力,采用N个协程从EMQ中读取数据(EMQ中都一个消息队列,里面缓存了大量的Push数据),数据读取后进行处理,然后将处理后的数据写到channel中。同时,服务有M个协程从channel中取出数据并消费,然后通过小米Push SDK,给用户发送Push。整体发送链路如下:

在看后面的内容前,我先抛出几个问题:

7.4.2 初始化

初始化channel数组,数组里面是每个业务方appTypes的channel,channel的缓存区大小为30,并启动10个消费者协程:


var (
   messageChan    map[string]chan *WorkMessage  // channel
   stopMasterChan chan bool                     // 消费者结束通知
   appTypes = map[int32]string{1: "shop", 2: "bbs", 3: "sharesave"}
)
func initPushChannel() {
   maxSize = 30 // channel缓存区大小
   workNum = 10 // goroutine个数
   stopMasterChan = make(chan bool)
   messageChan = make(map[string]chan *WorkMessage)
   for _, name := range appTypes {
      workChan := make(chan *WorkMessage, maxSize)
      messageChan[name] = workChan
      for i := 0; i < workNum; i++ {
         go startMaster(name, workChan) // 启动消费者协程
      }
   }
}
func startMaster(name string, workChan chan *WorkMessage) {
   for {
      if exit := dostartMaster(name, workChan); exit {
         return
      }
   }
}

初始化EMQ的Client,并启动10个生产者协程:

var (
   clientFactory  client.ClientFactory // EMQ Client
   stopChan       chan bool            // 生产者结束通知
)
func initEmq() {
   // 初始化EMQ的Client和单次读取数据条数,该处代码省略...
   maxConsumerNum := 10
   stopChan = make(chan bool)
   for i := 0; i < maxConsumerNum; i++ {
      go receiveMsg(i) // 启动生产者协程
   }
}
func receiveMsg(queueID int) {
   for {
      if exit := doReceiveMsg(queueID); exit {
         logz.Info("stop receive msg ...", logz.F("queueID", queueID))
         return
      }
   }
}

主方法调用:

func InitWorker() {
   // 初始化push SDK,逻辑省略...
   initPushChannel() // 初始化Channel,启动消费者
   initEmq()         // 启动生产者
}

7.4.3 发送

func doReceiveMsg(queueID int) bool {
   defer func() {
      if err := recover(); err != nil {
         println("[panic] recover from error.")
      }
   }()
   ticker := time.NewTicker(time.Second)
   for {
      select {
      case <-ticker.C:
         // 1. 从EMQ获取数据List,逻辑省略...
         // 2. 遍历List,获取业务类型,逻辑省略...
         // 3. 根据业务类型,获取对应的channel
         name := "sharesave"  // 示例数据
         pushChannel, _ := messageChan[name]
         // 4. 构造Push数据,然后放入channel
         pushData := &WorkMessage{AppLocal: "id", AppType: 1} // 示例数据
         pushChannel <- pushData
      case <-stopChan:
         println("stop to send data to channel.")
         return true
      }
   }
}

这部分代码我做了大量简化,这里主要做了2件事情:

  1. 通过select + 定时器,每隔1S就会从EMQ中获取数据,然后将构造后的数据放入对应业务的channel;
  2. 当收到stopChan事件时,会通知所有的生产者协程,退出goroutine,这里其实就是协程退出的方式之一。

7.4.4 接收

func dostartMaster(name string, workChan chan *WorkMessage) bool {
   defer func() {
      if err := recover(); err != nil {
         println("[panic] recover from error.")
      }
   }()
   for {
      select {
      case t := <-workChan:
         if t != nil {
             for _, message := range t.PushMessages {
                // 接受channel数据t,将数据推给Push SDK
                // 逻辑省略...
             }
         }
      case <-stopMasterChan:
         println("stop to get data from channel.")
         return true
      }
   }
}

这部分代码同样做了大量简化,这里主要做了2件事情:

  1. 通过select,如果channel里面有数据,直接读取,然后给用户发送Push;
  2. 当收到stopMasterChan事件时,会通知所有的生产者协程,退出goroutine。

7.4.5 关闭

// 通知生产者协程关闭,协程不再写channel
func stopRecvMsgFromQueue() {
   close(stopChan)
}
// 通知消费者协程关闭,协程不再读channel,并关闭channel,消费完channel中剩余消息
func stopPushChannel() {
   close(stopMasterChan)
   time.Sleep(time.Second)
   for _, c := range messageChan {
      close(c)
      for msg := range c {
         if msg != nil {
            for _, message := range msg.PushMessages {
               // 接受channel数据t,将数据推给Push SDK
               // 逻辑省略...
            }
         }
      }
   }
}
// 主方法调用
func StopWorker() {
   stopRecvMsgFromQueue()
   time.Sleep(time.Second * 2)
   stopPushChannel()
}

比如服务重启,需要关闭协程时,主要做以下事情:

  1. 执行close(stopChan),先通知生产者协程,不再往channel里面写数据;
  2. 执行close(stopMasterChan),通知消费者协程,不再从channel里面读取数据;
  3. 关闭数组messageChan的每个channel;
  4. 继续读取channel中剩余的数据,因为使用的是for-range方式,所以当channel里面所有的数据读取完毕后,for-range会自动退出。

这里有两个地方sleep了一下,分别有以下作用:

  1. 调用stopPushChannel()前sleep:关闭生成者后,消费者继续消费剩余的数据;
  2. 调用close(c)前sleep:避免协程未完全关闭,导致往关闭的channel写数据,导致panic。

7.5 总结

本章基本都是干货,上面总结的比较全面,这里就不再重复了,如果你能回答我提的这些问题,你应该就掌握了本章的内容:

最后就是Push的并发示例,强烈建议大家能掌握,掌握了这个示例,后续你应该也能很容易通过channel实现数据共享,并结合goroutine写出你自己的高并发程序。

Copyright© 2013-2020

All Rights Reserved 京ICP备2023019179号-8