注:go version 1.16.x

Overview

从网站pkg.go.dev上可以看到,对应的解释。

atomic包提供了用于实现同步算法的低级原子内存原语。

可以分为几类操作:

  • Add操作:加减操作
  • CAS操作:先比较后赋值操作
  • Swap操作:赋值操作
  • Load操作:从某个地址中取值
  • Store操作:往某个地址赋值
  • Value类型:对任意类型的Load/Store操作封装

操作分类

Add操作

由AddT函数实现的加法操作在原子上等效于:

*addr += delta  \\ 加上步长 正负数都可以
return *addr    \\ 反回加后的结果

相关的方法有:

func AddInt32(addr *int32, delta int32) (new int32)
func AddUint32(addr *uint32, delta uint32) (new uint32)
func AddInt64(addr *int64, delta int64) (new int64)
func AddUint64(addr *uint64, delta uint64) (new uint64)
func AddUintptr(addr *uintptr, delta uintptr) (new uintptr)

CAS操作

CAS即CompareAndSwap,这个函数主要就是先比较一下当前传入的地址的值是否和 old 值相等,如果相等,就赋值新值返回 true,如果不相等就返回 false.

利用这个方法可以实现锁机制。

相关的方法有:

func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)
func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool)
func CompareAndSwapUint32(addr *uint32, old, new uint32) (swapped bool)
func CompareAndSwapUint64(addr *uint64, old, new uint64) (swapped bool)
func CompareAndSwapUintptr(addr *uintptr, old, new uintptr) (swapped bool)
func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool)

从源码runtime/internal/atomic/asm_amd64.s中可以看到CAS对应的汇编指令实现。 如CompareAndSwapInt32方法的实现:

// bool Cas(int32 *val, int32 old, int32 new)
// Atomically:
//	if(*val == old){
//		*val = new;
//		return 1;
//	} else
//		return 0;
TEXT runtimeinternalatomic·Cas(SB),NOSPLIT,$0-17
	MOVQ	ptr+0(FP), BX
	MOVL	old+8(FP), AX
	MOVL	new+12(FP), CX
	LOCK
	CMPXCHGL	CX, 0(BX)
	SETEQ	ret+16(FP)
	RET

从汇编指令可以看出,使用了CPU的LOCK指令来保证原子的操作,而CMPXCHGL指令则是CPU级别实现的CAS操作。

关于LOCK指令,可以发散到CPU指令的锁总线、锁缓存等内容,而CPU会有多级缓存,这就需要通过缓存一致性去保证原子性。而MESI是缓存一致性协议的一种实现方法。

CAS不能解决的问题

  • CAS在共享资源竞争比较激烈的时候,每个goroutine会容易处于自旋状态,影响效率,在竞争激烈的时候推荐使用锁。
  • 无法解决ABA问题

ABA问题是无锁结构实现中常见的一种问题,可基本表述为: 进程P1读取了一个数值A P1被挂起(时间片耗尽、中断等),进程P2开始执行 P2修改数值A为数值B,然后又修改回A P1被唤醒,比较后发现数值A没有变化,程序继续执行。

Swap操作

Swap操作会执行交换操作,并且返回原来的值。

old = *addr  // 复制原来的值
*addr = new  // 赋值为新的值
return old   // 返回原来的值
func SwapInt32(addr *int32, new int32) (old int32)
func SwapInt64(addr *int64, new int64) (old int64)
func SwapPointer(addr *unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer)
func SwapUint32(addr *uint32, new uint32) (old uint32)
func SwapUint64(addr *uint64, new uint64) (old uint64)
func SwapUintptr(addr *uintptr, new uintptr) (old uintptr)

Load操作

从某个地址里获取对应的值:

return *addr

方法签名:

func LoadInt32(addr *int32) (val int32)
func LoadInt64(addr *int64) (val int64)
func LoadPointer(addr *unsafe.Pointer) (val unsafe.Pointer)
func LoadUint32(addr *uint32) (val uint32)
func LoadUint64(addr *uint64) (val uint64)
func LoadUintptr(addr *uintptr) (val uintptr)

Store操作

往某个地址里赋值:

*addr = val

方法签名:

func StoreInt32(addr *int32, val int32)
func StoreInt64(addr *int64, val int64)
func StorePointer(addr *unsafe.Pointer, val unsafe.Pointer)
func StoreUint32(addr *uint32, val uint32)
func StoreUint64(addr *uint64, val uint64)
func StoreUintptr(addr *uintptr, val uintptr)

Value类型

从源码https://go.googlesource.com/go/+/go1.16.3/src/sync/atomic/value.go#16可以看到:

定义:

type Value struct {
	v interface{}
}
func (v *Value) Load() (x interface{})
func (v *Value) Store(x interface{}) 

源码如下:

// Copyright 2014 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package atomic
import (
	"unsafe"
)
// A Value provides an atomic load and store of a consistently typed value.
// The zero value for a Value returns nil from Load.
// Once Store has been called, a Value must not be copied.
//
// A Value must not be copied after first use.
type Value struct {
	v interface{}
}
// ifaceWords is interface{} internal representation.
type ifaceWords struct {
	typ  unsafe.Pointer
	data unsafe.Pointer
}
// Load returns the value set by the most recent Store.
// It returns nil if there has been no call to Store for this Value.
func (v *Value) Load() (x interface{}) {
	vp := (*ifaceWords)(unsafe.Pointer(v))
	typ := LoadPointer(&vp.typ)
	if typ == nil || uintptr(typ) == ^uintptr(0) {
		// First store not yet completed.
		return nil
	}
	data := LoadPointer(&vp.data)
	xp := (*ifaceWords)(unsafe.Pointer(&x))
	xp.typ = typ
	xp.data = data
	return
}
// Store sets the value of the Value to x.
// All calls to Store for a given Value must use values of the same concrete type.
// Store of an inconsistent type panics, as does Store(nil).
func (v *Value) Store(x interface{}) {
	if x == nil {
		panic("sync/atomic: store of nil value into Value")
	}
	vp := (*ifaceWords)(unsafe.Pointer(v))
	xp := (*ifaceWords)(unsafe.Pointer(&x))
	for {
		typ := LoadPointer(&vp.typ)
		if typ == nil {
			// Attempt to start first store.
			// Disable preemption so that other goroutines can use
			// active spin wait to wait for completion; and so that
			// GC does not see the fake type accidentally.
			runtime_procPin()
			if !CompareAndSwapPointer(&vp.typ, nil, unsafe.Pointer(^uintptr(0))) {
				runtime_procUnpin()
				continue
			}
			// Complete first store.
			StorePointer(&vp.data, xp.data)
			StorePointer(&vp.typ, xp.typ)
			runtime_procUnpin()
			return
		}
		if uintptr(typ) == ^uintptr(0) {
			// First store in progress. Wait.
			// Since we disable preemption around the first store,
			// we can wait with active spinning.
			continue
		}
		// First store completed. Check type and overwrite data.
		if typ != xp.typ {
			panic("sync/atomic: store of inconsistently typed value into Value")
		}
		StorePointer(&vp.data, xp.data)
		return
	}
}
// Disable/enable preemption, implemented in runtime.
func runtime_procPin()
func runtime_procUnpin()

应用

atomic.Value这种类型常用于读多写少的场景,比如配置结构体的热更新等。

例子COW(Copy On Write)

写时复制:(英语:Copy-on-write,简称 COW)是一种计算机程序设计领域的优化策略。其核心思想是,如果有多个调用者(callers)同时请求相同资源(如内存或磁盘上的数据存储),他们会共同获取相同的指针指向相同的资源,直到某个调用者试图修改资源的内容时,系统才会真正复制一份专用副本(private copy)给该调用者,而其他调用者所见到的最初的资源仍然保持不变。这过程对其他的调用者都是透明的。此作法主要的优点是如果调用者没有修改该资源,就不会有副本(private copy)被创建,因此多个调用者只是读取操作时可以共享同一份资源。

维基百科

通俗的讲就是,需要写入的时候我先把老的数据复制一份到一个新的对象,然后再写入新的值。

存在的问题:老的对象可能会被其他goroutine使用而导致不回立即被垃圾回收,对于多写的场景会产生大量副本,从而导致性能下降。

优点:无锁(或者说很轻量的锁),所以也不会有 goroutine 的上下文切换,并且在读取的时候大家都读取的相同的副本所以性能上回好一些。

COW 策略在 linux, redis 中都有很多的实践。

package main

import (
	"fmt"
	"sync/atomic"
)

type Map struct {
	v       atomic.Value
	//writeMu sync.Mutex
}

func NewMap() *Map {
	res := &Map{
		v:       atomic.Value{},
		//writeMu: sync.Mutex{},
	}
	res.v.Store(make(map[string]string))
	return res
}

func (m *Map) Get(key string) string {
	m1 := m.v.Load().(map[string]string)
	return m1[key]
}

func (m *Map) Put(key, val string) {
	//m.writeMu.Lock() // synchronize with other potential writers
	//defer m.writeMu.Unlock()
	m1 := m.v.Load().(map[string]string) // load current value of the data structure
	m2 := make(map[string]string)        // create a new value
	for k, v := range m1 {
		m2[k] = v // copy all data from the current object to the new one
	}
	m2[key] = val // do the update that we need
	m.v.Store(m2) // atomically replace the current object with the new one
	// At this point all new readers start working with the new version.
	// The old version will be garbage collected once the existing readers
	// (if any) are done with it.
}

// COW
func main() {
	m := NewMap()
	m.Put("key", "value")
	fmt.Println(m.Get("key"))
}