Go源码解读之sync/atomic
Table of Contents
注:go version 1.16.x
Overview⌗
从网站pkg.go.dev上可以看到,对应的解释。
atomic包提供了用于实现同步算法的低级原子内存原语。
可以分为几类操作:
- Add操作:加减操作
- CAS操作:先比较后赋值操作
- Swap操作:赋值操作
- Load操作:从某个地址中取值
- Store操作:往某个地址赋值
- Value类型:对任意类型的Load/Store操作封装
操作分类⌗
Add操作⌗
由AddT函数实现的加法操作在原子上等效于:
*addr += delta \\ 加上步长 正负数都可以
return *addr \\ 反回加后的结果
相关的方法有:
func AddInt32(addr *int32, delta int32) (new int32)
func AddUint32(addr *uint32, delta uint32) (new uint32)
func AddInt64(addr *int64, delta int64) (new int64)
func AddUint64(addr *uint64, delta uint64) (new uint64)
func AddUintptr(addr *uintptr, delta uintptr) (new uintptr)
CAS操作⌗
CAS即CompareAndSwap,这个函数主要就是先比较一下当前传入的地址的值是否和 old 值相等,如果相等,就赋值新值返回 true,如果不相等就返回 false.
利用这个方法可以实现锁机制。
相关的方法有:
func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)
func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool)
func CompareAndSwapUint32(addr *uint32, old, new uint32) (swapped bool)
func CompareAndSwapUint64(addr *uint64, old, new uint64) (swapped bool)
func CompareAndSwapUintptr(addr *uintptr, old, new uintptr) (swapped bool)
func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool)
从源码runtime/internal/atomic/asm_amd64.s中可以看到CAS对应的汇编指令实现。
如CompareAndSwapInt32
方法的实现:
// bool Cas(int32 *val, int32 old, int32 new)
// Atomically:
// if(*val == old){
// *val = new;
// return 1;
// } else
// return 0;
TEXT runtime∕internal∕atomic·Cas(SB),NOSPLIT,$0-17
MOVQ ptr+0(FP), BX
MOVL old+8(FP), AX
MOVL new+12(FP), CX
LOCK
CMPXCHGL CX, 0(BX)
SETEQ ret+16(FP)
RET
从汇编指令可以看出,使用了CPU的LOCK
指令来保证原子的操作,而CMPXCHGL
指令则是CPU级别实现的CAS操作。
关于LOCK指令,可以发散到CPU指令的锁总线、锁缓存等内容,而CPU会有多级缓存,这就需要通过缓存一致性去保证原子性。而MESI是缓存一致性协议的一种实现方法。
CAS不能解决的问题⌗
- CAS在共享资源竞争比较激烈的时候,每个goroutine会容易处于自旋状态,影响效率,在竞争激烈的时候推荐使用锁。
- 无法解决ABA问题
ABA问题是无锁结构实现中常见的一种问题,可基本表述为: 进程P1读取了一个数值A P1被挂起(时间片耗尽、中断等),进程P2开始执行 P2修改数值A为数值B,然后又修改回A P1被唤醒,比较后发现数值A没有变化,程序继续执行。
Swap操作⌗
Swap操作会执行交换操作,并且返回原来的值。
old = *addr // 复制原来的值
*addr = new // 赋值为新的值
return old // 返回原来的值
func SwapInt32(addr *int32, new int32) (old int32)
func SwapInt64(addr *int64, new int64) (old int64)
func SwapPointer(addr *unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer)
func SwapUint32(addr *uint32, new uint32) (old uint32)
func SwapUint64(addr *uint64, new uint64) (old uint64)
func SwapUintptr(addr *uintptr, new uintptr) (old uintptr)
Load操作⌗
从某个地址里获取对应的值:
return *addr
方法签名:
func LoadInt32(addr *int32) (val int32)
func LoadInt64(addr *int64) (val int64)
func LoadPointer(addr *unsafe.Pointer) (val unsafe.Pointer)
func LoadUint32(addr *uint32) (val uint32)
func LoadUint64(addr *uint64) (val uint64)
func LoadUintptr(addr *uintptr) (val uintptr)
Store操作⌗
往某个地址里赋值:
*addr = val
方法签名:
func StoreInt32(addr *int32, val int32)
func StoreInt64(addr *int64, val int64)
func StorePointer(addr *unsafe.Pointer, val unsafe.Pointer)
func StoreUint32(addr *uint32, val uint32)
func StoreUint64(addr *uint64, val uint64)
func StoreUintptr(addr *uintptr, val uintptr)
Value类型⌗
从源码https://go.googlesource.com/go/+/go1.16.3/src/sync/atomic/value.go#16可以看到:
定义:
type Value struct {
v interface{}
}
func (v *Value) Load() (x interface{})
func (v *Value) Store(x interface{})
源码如下:
// Copyright 2014 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package atomic
import (
"unsafe"
)
// A Value provides an atomic load and store of a consistently typed value.
// The zero value for a Value returns nil from Load.
// Once Store has been called, a Value must not be copied.
//
// A Value must not be copied after first use.
type Value struct {
v interface{}
}
// ifaceWords is interface{} internal representation.
type ifaceWords struct {
typ unsafe.Pointer
data unsafe.Pointer
}
// Load returns the value set by the most recent Store.
// It returns nil if there has been no call to Store for this Value.
func (v *Value) Load() (x interface{}) {
vp := (*ifaceWords)(unsafe.Pointer(v))
typ := LoadPointer(&vp.typ)
if typ == nil || uintptr(typ) == ^uintptr(0) {
// First store not yet completed.
return nil
}
data := LoadPointer(&vp.data)
xp := (*ifaceWords)(unsafe.Pointer(&x))
xp.typ = typ
xp.data = data
return
}
// Store sets the value of the Value to x.
// All calls to Store for a given Value must use values of the same concrete type.
// Store of an inconsistent type panics, as does Store(nil).
func (v *Value) Store(x interface{}) {
if x == nil {
panic("sync/atomic: store of nil value into Value")
}
vp := (*ifaceWords)(unsafe.Pointer(v))
xp := (*ifaceWords)(unsafe.Pointer(&x))
for {
typ := LoadPointer(&vp.typ)
if typ == nil {
// Attempt to start first store.
// Disable preemption so that other goroutines can use
// active spin wait to wait for completion; and so that
// GC does not see the fake type accidentally.
runtime_procPin()
if !CompareAndSwapPointer(&vp.typ, nil, unsafe.Pointer(^uintptr(0))) {
runtime_procUnpin()
continue
}
// Complete first store.
StorePointer(&vp.data, xp.data)
StorePointer(&vp.typ, xp.typ)
runtime_procUnpin()
return
}
if uintptr(typ) == ^uintptr(0) {
// First store in progress. Wait.
// Since we disable preemption around the first store,
// we can wait with active spinning.
continue
}
// First store completed. Check type and overwrite data.
if typ != xp.typ {
panic("sync/atomic: store of inconsistently typed value into Value")
}
StorePointer(&vp.data, xp.data)
return
}
}
// Disable/enable preemption, implemented in runtime.
func runtime_procPin()
func runtime_procUnpin()
应用⌗
atomic.Value这种类型常用于读多写少的场景,比如配置结构体的热更新等。
例子COW(Copy On Write)⌗
写时复制:(英语:Copy-on-write,简称 COW)是一种计算机程序设计领域的优化策略。其核心思想是,如果有多个调用者(callers)同时请求相同资源(如内存或磁盘上的数据存储),他们会共同获取相同的指针指向相同的资源,直到某个调用者试图修改资源的内容时,系统才会真正复制一份专用副本(private copy)给该调用者,而其他调用者所见到的最初的资源仍然保持不变。这过程对其他的调用者都是透明的。此作法主要的优点是如果调用者没有修改该资源,就不会有副本(private copy)被创建,因此多个调用者只是读取操作时可以共享同一份资源。
通俗的讲就是,需要写入的时候我先把老的数据复制一份到一个新的对象,然后再写入新的值。
存在的问题:老的对象可能会被其他goroutine使用而导致不回立即被垃圾回收,对于多写的场景会产生大量副本,从而导致性能下降。
优点:无锁(或者说很轻量的锁),所以也不会有 goroutine 的上下文切换,并且在读取的时候大家都读取的相同的副本所以性能上回好一些。
COW 策略在 linux, redis 中都有很多的实践。
package main
import (
"fmt"
"sync/atomic"
)
type Map struct {
v atomic.Value
//writeMu sync.Mutex
}
func NewMap() *Map {
res := &Map{
v: atomic.Value{},
//writeMu: sync.Mutex{},
}
res.v.Store(make(map[string]string))
return res
}
func (m *Map) Get(key string) string {
m1 := m.v.Load().(map[string]string)
return m1[key]
}
func (m *Map) Put(key, val string) {
//m.writeMu.Lock() // synchronize with other potential writers
//defer m.writeMu.Unlock()
m1 := m.v.Load().(map[string]string) // load current value of the data structure
m2 := make(map[string]string) // create a new value
for k, v := range m1 {
m2[k] = v // copy all data from the current object to the new one
}
m2[key] = val // do the update that we need
m.v.Store(m2) // atomically replace the current object with the new one
// At this point all new readers start working with the new version.
// The old version will be garbage collected once the existing readers
// (if any) are done with it.
}
// COW
func main() {
m := NewMap()
m.Put("key", "value")
fmt.Println(m.Get("key"))
}