回顾之前的知识,我们先看一个关于WaitGroup的示例:
func main() {
var wg sync.WaitGroup
wg.Add(2)
go func() {
time.Sleep(2*time.Second)
fmt.Println("1号完成")
wg.Done()
}()
go func() {
time.Sleep(2*time.Second)
fmt.Println("2号完成")
wg.Done()
}()
wg.Wait()
fmt.Println("好了,大家都干完了,放工")
}
示例比较简单,main协程等待两个goroutine的结束。如果是希望主协程关闭,通知goutoutine关闭,我们可以使用select + chan的方式:
func main() {
stop := make(chan bool)
go func() {
for {
select {
case <-stop:
fmt.Println("监控退出,停止了...")
return
default:
fmt.Println("goroutine监控中...")
time.Sleep(2 * time.Second)
}
}
}()
time.Sleep(10 * time.Second)
fmt.Println("可以了,通知监控停止")
stop<- true
//为了检测监控过是否停止,如果没有监控输出,就表示停止了
time.Sleep(5 * time.Second)
}
这种chan+select的方式,是比较优雅的结束一个goroutine的方式,不过这种方式也有局限性,如果有很多goroutine都需要控制结束怎么办呢?如果这些goroutine又衍生了其他更多的goroutine怎么办呢?如果一层层的无穷尽的goroutine呢?这就非常复杂了,即使我们定义很多chan也很难解决这个问题,因为goroutine的关系链就导致了这种场景非常复杂。
上面说的这种场景是存在的,比如一个网络请求Request,每个Request都需要开启一个goroutine做一些事情,这些goroutine又可能会开启其他的goroutine。所以我们需要一种可以跟踪goroutine的方案,才可以达到控制他们的目的,这就是Go语言为我们提供的Context,称之为上下文非常贴切,它就是goroutine的上下文,我们对上面示例进行改造:
func main() {
ctx, cancel := context.WithCancel(context.Background())
go func(ctx context.Context) {
for {
select {
case <-ctx.Done():
fmt.Println("监控退出,停止了...")
return
default:
fmt.Println("goroutine监控中...")
time.Sleep(2 * time.Second)
}
}
}(ctx)
time.Sleep(10 * time.Second)
fmt.Println("可以了,通知监控停止")
cancel()
//为了检测监控过是否停止,如果没有监控输出,就表示停止了
time.Sleep(5 * time.Second)
}
当执行cancel()时,goroutine会接收到ctx.Done()的信号,协程退出,对于控制多个goroutine的示例如下:
func main() {
ctx, cancel := context.WithCancel(context.Background())
go watch(ctx,"【监控1】")
go watch(ctx,"【监控2】")
go watch(ctx,"【监控3】")
time.Sleep(10 * time.Second)
fmt.Println("可以了,通知监控停止")
cancel()
//为了检测监控过是否停止,如果没有监控输出,就表示停止了
time.Sleep(5 * time.Second)
}
func watch(ctx context.Context, name string) {
for {
select {
case <-ctx.Done():
fmt.Println(name,"监控退出,停止了...")
return
default:
fmt.Println(name,"goroutine监控中...")
time.Sleep(2 * time.Second)
}
}
}
11.2.1 Context接口
Context的接口定义的比较简洁,我们看下这个接口的方法:
type Context interface {
Deadline() (deadline time.Time, ok bool)
Done() <-chan struct{}
Err() error
Value(key interface{}) interface{}
}
这个接口共有4个方法,了解这些方法的意思非常重要,这样我们才可以更好的使用他们:
11.2.2 顶层Context
Context接口并不需要我们实现,Go内置已经帮我们实现了2个,我们代码中最开始都是以这两个内置的作为最顶层的partent context,衍生出更多的子Context:
var (
background = new(emptyCtx)
todo = new(emptyCtx)
)
func Background() Context {
return background
}
func TODO() Context {
return todo
}
一个是Background,主要用于main函数、初始化以及测试代码中,作为Context这个树结构的最顶层的Context,也就是根Context。一个是TODO,它目前还不知道具体的使用场景,如果我们不知道该使用什么Context的时候,可以使用这个。他们两个本质上都是emptyCtx结构体类型,是一个不可取消,没有设置截止时间,没有携带任何值的Context。
type emptyCtx int
func (*emptyCtx) Deadline() (deadline time.Time, ok bool) {
return
}
func (*emptyCtx) Done() <-chan struct{} {
return nil
}
func (*emptyCtx) Err() error {
return nil
}
func (*emptyCtx) Value(key interface{}) interface{} {
return nil
}
这就是emptyCtx实现Context接口的方法,可以看到,这些方法什么都没做,返回的都是nil或者零值。
11.2.3 子Context
有了如上的根Context,那么是如何衍生更多的子Context的呢?这就要靠context包为我们提供的With系列的函数了:
func WithCancel(parent Context) (ctx Context, cancel CancelFunc)
func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc)
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)
func WithValue(parent Context, key, val interface{}) Context
这四个With函数,接收的都有一个partent参数,就是父Context,我们要基于这个父Context创建出子Context的意思,这种方式可以理解为子Context对父Context的继承,也可以理解为基于父Context的衍生。通过这些函数,就创建了一颗Context树,树的每个节点都可以有任意多个子节点,节点层级可以有任意多个。
大家可能留意到,前三个函数都返回一个取消函数CancelFunc,这就是取消函数的类型,该函数可以取消一个Context,以及这个节点Context下所有的所有的Context,不管有多少层级。
11.2.4 元数传递
通过Context我们也可以传递一些必须的元数据,这些数据会附加在Context上以供使用。
var key string="name"
func main() {
ctx, cancel := context.WithCancel(context.Background())
//附加值
valueCtx:=context.WithValue(ctx,key,"【监控1】")
go watch(valueCtx)
time.Sleep(10 * time.Second)
fmt.Println("可以了,通知监控停止")
cancel()
//为了检测监控过是否停止,如果没有监控输出,就表示停止了
time.Sleep(5 * time.Second)
}
func watch(ctx context.Context) {
for {
select {
case <-ctx.Done():
//取出值
fmt.Println(ctx.Value(key),"监控退出,停止了...")
return
default:
//取出值
fmt.Println(ctx.Value(key),"goroutine监控中...")
time.Sleep(2 * time.Second)
}
}
}
在前面的例子,我们通过传递参数的方式,把name的值传递给监控函数。在这个例子里,我们实现一样的效果,但是通过的是Context的Value的方式。我们可以使用context.WithValue方法附加一对K-V的键值对,这里Key必须是等价性的,也就是具有可比性;Value值要是线程安全的。这样我们就生成了一个新的Context,这个新的Context带有这个键值对,在使用的时候,可以通过Value方法读取ctx.Value(key)。
这里我们主要先讨论一下撤销的操作。Done方法会返回一个元素类型为struct{}的接收通道,不过,这个接收通道的用途并不是传递元素值,而是让调用方去感知“撤销”当前Context值的那个信号,一旦当前的Context值被撤销,这里的接收通道就会被立即关闭,因为对于一个未包含任何元素值的通道来说,它的关闭会使任何针对它的接收操作立即结束。这里解释的可能有点绕,或者换句话来说,如果Context取消的时候,它其实主要是关闭chan,关闭的chan是可以读取的,所以只要可以读取的时候,就意味着可以通过Done收到Context取消的信号了。
除了让Context值的使用方感知到撤销信号,让它们得到“撤销”的具体原因,有时也是很有必要的。后者即是Context类型的Err方法的作用。该方法的结果是error类型的,并且其值只可能等于context.Canceled变量的值,或者context.DeadlineExceeded变量的值,我们看一个经典用法:
func Stream(ctx context.Context, out chan<- Value) error {
for {
v, err := DoSomething(ctx)
if err != nil {
return err
}
select {
case <-ctx.Done():
return ctx.Err()
case out <- v:
}
}
}
我们再讨论撤销信号是如何在上下文树中传播的,在撤销函数被调用之后,对应的Context值会先关闭它内部的接收通道,也就是它的Done方法会返回的那个通道。然后,它会向它的所有子值(或者说子节点)传达撤销信号,这些子值会如法炮制,把撤销信号继续传播下去。最后,这个Context值会断开它与其父值之间的关联。先看一幅图:
我们通过调用context包的WithDeadline函数或者WithTimeout函数生成的Context值也是可撤销的。它们不但可以被手动撤销,还会依据在生成时被给定的过期时间,自动地进行定时撤销,这里定时撤销的功能是借助它们内部的计时器来实现的。当过期时间到达时,这两种Context值的行为与Context值被手动撤销时的行为是几乎一致的,只不过前者会在最后停止并释放掉其内部的计时器。最后要注意,通过调用context.WithValue函数得到的Context值是不可撤销的,撤销信号在被传播时,若遇到它们则会直接跨过,并试图将信号直接传给它们的子值。
package main
import (
"context"
"fmt"
"math/rand"
"time"
)
// 作用:1.随机sleep一会;2.如果入参ch不为空,会把sleep的时间给到ch
func sleepRandom(fromFunction string, ch chan int) {
defer func() { fmt.Println(fromFunction, "sleepRandom complete") }()
seed := time.Now().UnixNano()
r := rand.New(rand.NewSource(seed))
randomNumber := r.Intn(100)
sleeptime := randomNumber + 100
fmt.Println(fromFunction, "Starting sleep for", sleeptime, "ms")
time.Sleep(time.Duration(sleeptime) * time.Millisecond)
fmt.Println(fromFunction, "Waking up, slept for ", sleeptime, "ms")
if ch != nil {
ch <- sleeptime
}
}
func sleepRandomContext(ctx context.Context, ch chan bool) {
defer func() {
fmt.Println("sleepRandomContext complete")
// 通过channel,通知上游执行完毕
ch <- true
}()
sleeptimeChan := make(chan int)
// 开启新的协程G2,让该协程执行逻辑,执行完毕后,通过sleeptimeChan通知执行完毕
go sleepRandom("sleepRandomContext", sleeptimeChan)
select {
case <-ctx.Done():
// 场景1:main()调用cancelFunction()
// 场景2:doWorkContext()调用cancelFunction()
// 场景3:doWorkContext()自动超时
fmt.Println("sleepRandomContext: Time to return")
case sleeptime := <-sleeptimeChan:
// 当新的协程G2执行完毕,调用ch<-sleeptime时
fmt.Println("Slept for ", sleeptime, "ms")
}
}
func doWorkContext(ctx context.Context) {
// 生成新的ctx,超时时间为150ms
ctxWithTimeout, cancelFunction := context.WithTimeout(ctx, time.Duration(150)*time.Millisecond)
defer func() {
fmt.Println("doWorkContext complete")
// 下游所有的ctx都会关闭
cancelFunction()
}()
ch := make(chan bool)
// 启动新的协程G1
go sleepRandomContext(ctxWithTimeout, ch)
select {
case <-ctx.Done():
// 当main退出,调用main的cancelFunction()时
fmt.Println("doWorkContext: Time to return")
case <-ch:
// 当新的协程G1退出,执行ch<-true时
fmt.Println("sleepRandomContext returned")
}
}
func main() {
ctx := context.Background()
ctxWithCancel, cancelFunction := context.WithCancel(ctx)
defer func() {
fmt.Println("Main Defer: canceling context")
// 下游所有的ctx都会关闭
cancelFunction()
}()
go func() {
// main函数sleep一会
sleepRandom("Main", nil)
// 下游所有的ctx都会关闭
cancelFunction()
fmt.Println("Main Sleep complete. canceling context")
}()
doWorkContext(ctxWithCancel)
}
对于上面这个示例,我描述一下每种场景:
前面3个是正常场景,后面3个是异常场景,无论哪种场景,设计思路是,当前函数退出时,下游所有context需要全部关闭,这个是依赖context可传递的特性,同时也能通知上游“我已经关闭了,请你继续你后续的操作”。
我们今天主要讨论的是context包中的函数和Context类型,该包中的函数都是用于产生新的Context类型值的,Context类型是一个可以帮助我们实现多goroutine 协作流程的同步工具,不但如此,我们还可以通过此类型的值传达撤销信号或传递数据。
Context类型的实际值大体上分为三种,即:根Context值、可撤销的Context值和含数据的Context值。所有的Context值共同构成了一颗上下文树,这棵树的作用域是全局的,而根Context值就是这棵树的根,它是全局唯一的,并且不提供任何额外的功能。
可撤销的Context值又分为:只可手动撤销的Context值,和可以定时撤销的Context值,我们可以通过生成它们时得到的撤销函数来对其进行手动的撤销。对于后者,定时撤销的时间必须在生成时就完全确定,并且不能更改,不过我们可以在过期时间达到之前,对其进行手动的撤销,一旦撤销函数被调用,撤销信号就会立即被传达给对应的Context值,并由该值的Done方法返回的接收通道表达出来。“撤销”这个操作是Context值能够协调多个 goroutine 的关键所在,撤销信号总是会沿着上下文树叶子节点的方向传播开来。含数据的Context值不能被撤销,而可撤销的Context值又无法携带数据,由于它们共同组成了一个有机的整体(即上下文树),所以在功能上要比sync.WaitGroup强大得多。
Copyright© 2013-2020
All Rights Reserved 京ICP备2023019179号-8