Go语言核心手册-10.原子操作

309次阅读  |  发布于2年以前

10.1 内容前导

上一章我们学到,互斥锁是一个很有用的同步工具,它可以保证每一时刻进入临界区的goroutine只有一个。通过对互斥锁的合理使用,我们可以使一个goroutine在执行临界区中的代码时,不被其他的goroutine打扰,但是它仍然可能会被中断(interruption)。

那什么是原子操作呢?我们已经知道,原子操作即是进行过程中不能被中断的操作。也就是说,针对某个值的原子操作在被进行的过程当中,CPU绝不会再去进行其它的针对该值的操作。为了实现这样的严谨性,原子操由 CPU 提供芯片级别的支持,所以绝对有效,即使在拥有多 CPU 核心,或者多 CPU 的计算机系统中,原子操作的保证也是不可撼动的。这使得原子操作可以完全地消除竞态条件,并能够绝对地保证并发安全性,它的执行速度要比其他的同步工具快得多,通常会高出好几个数量级。

不过它的缺点也很明显,正因为原子操作不能被中断,所以它需要足够简单,并且要求快速。你可以想象一下,如果原子操作迟迟不能完成,而它又不会被中断,那么将会给计算机执行指令的效率带来多么大的影响,所以操作系统层面只对针对二进制位或整数的原子操作提供了支持。

因此,我们可以结合实际情况,来判断是否可以将锁替换成原子操作。

10.2 用法说明

10.2.1 增或减

atomic包中提供了如下以Add为前缀的增减操作:


func AddInt32(addr *int32, delta int32) (new int32)
func AddInt64(addr *int64, delta int64) (new int64)
func AddUint32(addr *uint32, delta uint32) (new uint32)
func AddUint64(addr *uint64, delta uint64) (new uint64)
func AddUintptr(addr *uintptr, delta uintptr) (new uintptr)

需要注意的是,第一个参数必须是指针类型的值,通过指针变量可以获取被操作数在内存中的地址,从而施加特殊的CPU指令,确保同一时间只有一个goroutine能够进行操作,看个简单的示例:


var opts int64 = 0
for i := 0; i < 50; i++ { 
    // 注意第一个参数必须是地址
    atomic.AddInt64(&opts, 3) //加操作
    //atomic.AddInt64(&opts, -1) 减操作
    time.Sleep(time.Millisecond)
    }
time.Sleep(time.Second)
fmt.Println("opts: ", atomic.LoadInt64(&opts))

用于原子加法操作的函数可以做原子减法吗?比如atomic.AddInt32函数可以用于减小那个被操作的整数值吗?atomic.AddInt32函数的第二个参数代表差量,它的类型int32是有符号的,如果我们想做原子减法,那么把这个差量设置为负整数就可以了,对于atomic.AddInt64函数来说也是类似的。

不过如果想用atomic.AddUint32和atomic.AddUint64函数做原子减法,因为它们的第二个参数的类型uint32和uint64都是无符号的,就不能同AddInt32进行相同处理,但是可以依据下面这个表达式来给定atomic.AddUint32函数的第二个参数值:

^uint32(-N-1))

其中的N代表由负整数表示的差量,我们先要把差量的绝对值减去1,然后再把得到的这个无类型的整数常量,转换为uint32类型的值,最后在该值之上做按位异或操作,就可以获得最终的参数值。简单来说,此表达式的结果值的补码,与使用前一种方法得到的值的补码相同,所以这两种方式是等价的。

10.2.2 比较并交换

该操作简称 CAS(Compare And Swap),第一个参数的值应该是指向被操作值的指针值,该值的类型即为*int32,后两个参数的类型都是int32类型,它们的值应该分别代表被操作值的旧值和新值,函数在被调用之后会先判断参数addr指向的被操作值与参数old的值是否相等。仅当此判断得到肯定的结果之后,该函数才会用参数new代表的新值替换掉原先的旧值。否则,后面的替换操作就会被忽略。

func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)
func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool)
func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool)
func CompareAndSwapUint32(addr *uint32, old, new uint32) (swapped bool)
func CompareAndSwapUint64(addr *uint64, old, new uint64) (swapped bool)
func CompareAndSwapUintptr(addr *uintptr, old, new uintptr) (swapped bool)

当有大量的goroutine 对变量进行读写操作时,可能导致CAS操作无法成功,这时可以利用for循环多次尝试:


var value int64
func atomicAddOp(tmp int64) {
    for {
        oldValue := value
        if atomic.CompareAndSwapInt64(&value, oldValue, oldValue+tmp) {
            return
        }
    }
}

比较并交换操作与交换操作相比有什么不同,优势在哪里呢?比较并交换操作即 CAS 操作,是有条件的交换操作,只有在条件满足的情况下才会进行值的交换。CAS 操作并不是单一的操作,而是一种操作组合,这与其他的原子操作都不同。正因为如此,它的用途要更广泛一些,例如我们将它与for语句联用就可以实现一种简易的自旋锁(spinlock):

for {
    if atomic.CompareAndSwapInt32(&num2, 10, 0) {
        fmt.Println("The second number has gone to zero.")
        break
    }
    time.Sleep(time.Millisecond * 500)
}

10.2.3 载入

atomic包中提供了如下以Load为前缀的增减操作:

func LoadInt32(addr *int32) (val int32)
func LoadInt64(addr *int64) (val int64)
func LoadPointer(addr *unsafe.Pointer) (val unsafe.Pointer)
func LoadUint32(addr *uint32) (val uint32)
func LoadUint64(addr *uint64) (val uint64)
func LoadUintptr(addr *uintptr) (val uintptr)

载入操作能够保证原子的读变量的值,当读取的时候,任何其他CPU操作都无法对该变量进行读写,其实现机制受到底层硬件的支持。假设我已经保证了对一个变量的写操作都是原子操作,比如:加或减、存储、交换等等,那我对它进行读操作的时候,还有必要使用原子操作吗?

答案是很有必要,你可以对照一下读写锁,为什么在读写锁保护下的写操作和读操作之间是互斥的?这是为了防止读操作读到没有被修改完的值,如果写操作还没有进行完,读操作就来读了,那么就只能读到仅修改了一部分的值,这显然破坏了值的完整性。因此一旦决定要对一个共享资源进行保护,那就要做到完全的保护,不完全的保护基本上与不保护没有什么区别。

10.2.4 存储

atomic包中提供了如下以Store为前缀的存储操作:


func StoreInt32(addr *int32, val int32)
func StoreInt64(addr *int64, val int64)
func StorePointer(addr *unsafe.Pointer, val unsafe.Pointer)
func StoreUint32(addr *uint32, val uint32)
func StoreUint64(addr *uint64, val uint64)
func StoreUintptr(addr *uintptr, val uintptr)

此类操作确保了写变量的原子性,避免其他操作读到了修改变量过程中的脏数据。然后对于存储,需要掌握2条规则:

10.2.5 交换

atomic包中提供了如下以Swap为前缀的交换操作:

func SwapInt32(addr *int32, new int32) (old int32)
func SwapInt64(addr *int64, new int64) (old int64)
func SwapPointer(addr *unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer)
func SwapUint32(addr *uint32, new uint32) (old uint32)
func SwapUint64(addr *uint64, new uint64) (old uint64)
func SwapUintptr(addr *uintptr, new uintptr) (old uintptr)

相对于CAS,明显此类操作更为暴力直接,并不管变量的旧值是否被改变,直接赋予新值然后返回背替换的值。

10.3 扩展知识

对于原子操作,还有几条具体的使用建议:

  1. 不要把内部使用的原子值暴露给外界。比如,声明一个全局的原子变量并不是一个正确的做法,这个变量的访问权限最起码也应该是包级私有的。
  2. 如果不得不让包外,或模块外的代码使用你的原子值,那么可以声明一个包级私有的原子变量,然后再通过一个或多个公开的函数,让外界间接地使用到它。注意,这种情况下不要把原子值传递到外界,不论是传递原子值本身还是它的指针值。
  3. 如果通过某个函数可以向内部的原子值存储值的话,那么就应该在这个函数中先判断被存储值类型的合法性。若不合法,则应该直接返回对应的错误值,从而避免 panic 的发生。
  4. 如果可能的话,我们可以把原子值封装到一个数据类型中,比如一个结构体类型。这样,我们既可以通过该类型的方法更加安全地存储值,又可以在该类型中包含可存储值的合法类型信息。

除了上述使用建议之外,我还要再特别强调一点:尽量不要向原子值中存储引用类型的值。因为这很容易造成安全漏洞。请看下面的代码:

var box6 atomic.Value
v6 := []int{1, 2, 3}
box6.Store(v6)
v6[1] = 4 // 注意,此处的操作不是并发安全的!

我把一个[]int类型的切片值v6存入了原子值box6,由于切片类型属于引用类型,我在外面改动这个切片值,就等于修改了box6中存储的那个值,这相当于绕过了原子值而进行了非并发安全的操作,那么应该怎样修补这个漏洞呢?可以这样做:

store := func(v []int) {
 replica := make([]int, len(v))
 copy(replica, v)
 box6.Store(replica)
}
store(v6)
v6[2] = 5 // 此处的操作是安全的。

我先为切片值v6创建了一个完全的副本,这个副本涉及的数据已经与原值毫不相干,然后再把这个副本存入box6,因此无论我再对v6的值做怎样的修改,都不会破坏box6提供的安全保护。

10.4 实战场景:环形队列

// 环形队列
type RingBuffer struct {
   err   error
   count int32
   size  int32
   head  int32
   tail  int32
   buf   []unsafe.Pointer
}
// Get方法从buf中取出对象
func (r *RingBuffer) Get() interface{} {
   // 在高并发开始的时候,队列容易空,直接判断空性能最优
   if atomic.LoadInt32(&r.count) <= 0 {
      return nil
   }
   // 当扣减数量后没有超,就从队列里取出对象
   if atomic.AddInt32(&r.count, -1) >= 0 {
      idx := (atomic.AddInt32(&r.head, 1) - 1) % r.size
      if obj := atomic.LoadPointer(&r.buf[idx]); obj != unsafe.Pointer(nil) {
         o := *(*interface{})(obj)
         atomic.StorePointer(&r.buf[idx], nil)
         return o
      }
   } else {
      // 当减数量超了,再加回去
      atomic.AddInt32(&r.count, 1)
   }
   return nil
}
// Put方法将对象放回到buf中。如果buf满了,返回false
func (r *RingBuffer) Put(obj interface{}) bool {
   // 在高并发结束的时候,队列容易满,直接判满性能最优
   if atomic.LoadInt32(&r.count) >= r.size {
      return false
   }
   // 当增加数量后没有超,就将对象放到队列里
   if atomic.AddInt32(&r.count, 1) <= r.size {
      idx := (atomic.AddInt32(&r.tail, 1) - 1) % r.size
      atomic.StorePointer(&r.buf[idx], unsafe.Pointer(&obj))
      return true
   }
   // 当加的数量超了,再减回去
   atomic.AddInt32(&r.count, -1)
   return false
}

10.5 总结

原子值类型的优势很明显,但它的使用规则也更多一些。首先,在首次真正使用后,原子值就不应该再被复制了。其次,原子值的Store方法对其参数值(也就是被存储值)有两个强制的约束。一个约束是参数值不能为nil。另一个约束是,参数值的类型不能与首个被存储值的类型不同。也就是说,一旦一个原子值存储了某个类型的值,那它以后就只能存储这个类型的值了。最后在扩展知识中,提出了几条使用建议,包括:不要对外暴露原子变量、不要传递原子值及其指针值、尽量不要在原子值中存储引用类型的值等。

Copyright© 2013-2020

All Rights Reserved 京ICP备2023019179号-8