Go源码解读之sync.Cond
Table of Contents
前言⌗
前面🔗说过,Cond实现了一个条件变量,是等待或宣布一个事件发生的goroutines的汇合点。
就是说,使用sync.Cond可以做到多个协程等待某个协程通知的场景。
使用channel可以实现一读一写的场景,而Cond则实现多读一写的场景。
源码解析⌗
简化版方法签名:
// Cond结构体
type Cond struct {}
// NewCond 返回带Locker的Cond,这个Locker可以是
// *Mutex 或 *RWMutex
func NewCond(l Locker) *Cond {}
// 等待L的解锁并挂起goroutine
func (c *Cond) Wait() {}
// 唤醒1个因c阻塞的goroutine,
// 如果在Signal之后才Wait会导致all goroutines are asleep - deadlock
func (c *Cond) Signal() {}
// 唤醒所有因c阻塞的goroutine
// 如果在Broadcast之后才Wait会导致all goroutines are asleep - deadlock
func (c *Cond) Broadcast() {}
因此,在Signal或者Broadcast前要先保证目标的协程已经进入了Wait状态,否则会导致死锁。因为Signal或者Broadcast只唤醒当前正在被Wait阻塞的协程。
// Copyright 2011 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package sync
import (
"sync/atomic"
"unsafe"
)
// Cond implements a condition variable, a rendezvous point
// for goroutines waiting for or announcing the occurrence
// of an event.
// Cond实现了一个条件变量,它是goroutines等待或宣布事件发生的集合点。
// Each Cond has an associated Locker L (often a *Mutex or *RWMutex),
// which must be held when changing the condition and
// when calling the Wait method.
// 每个Cond都有一个相关的Locker L(通常是一个*Mutex或*RWMutex),
// 在改变条件和调用Wait方法时,必须持有这个L。
// A Cond must not be copied after first use.
type Cond struct {
noCopy noCopy
// L is held while observing or changing the condition
L Locker
// notifyList是用于实现sync.Cond的基于票证的通知列表
// 参考:
// https://go.googlesource.com/go/+/go1.16.4/src/runtime/sema.go#446
notify notifyList
checker copyChecker
}
// NewCond returns a new Cond with Locker l.
func NewCond(l Locker) *Cond {
return &Cond{L: l}
}
// Wait atomically unlocks c.L and suspends execution
// of the calling goroutine. After later resuming execution,
// Wait locks c.L before returning. Unlike in other systems,
// Wait cannot return unless awoken by Broadcast or Signal.
//
// Because c.L is not locked when Wait first resumes, the caller
// typically cannot assume that the condition is true when
// Wait returns. Instead, the caller should Wait in a loop:
//
// c.L.Lock()
// for !condition() {
// c.Wait()
// }
// ... make use of condition ...
// c.L.Unlock()
//
// Wait会先把加入到待唤醒队列,再释放锁,然后执行等待,
// 当其他goroutine调用Broadcast或者Signal来通知其恢复执行后,
// 会重新上锁
func (c *Cond) Wait() {
c.checker.check()
// notifyListAdd将调用者添加到通知列表中,以便它可以接收通知。
// 调用者必须最终调用notifyListWait来等待这样的通知,并显式传参。
// 参考:
// https://go.googlesource.com/go/+/go1.16.4/src/sync/runtime.go#31
// https://go.googlesource.com/go/+/go1.16.4/src/runtime/sema.go#475
t := runtime_notifyListAdd(&c.notify)
c.L.Unlock()
// notifyListWait等待通知。如果自那以后已发送,
// 调用notifyListAdd,它立即返回。否则,它将阻塞。
// 参考:
// https://go.googlesource.com/go/+/go1.16.4/src/sync/runtime.go#34
// https://go.googlesource.com/go/+/go1.16.4/src/runtime/sema.go#485
runtime_notifyListWait(&c.notify, t)
c.L.Lock()
}
// Signal wakes one goroutine waiting on c, if there is any.
//
// It is allowed but not required for the caller to hold c.L
// during the call.
func (c *Cond) Signal() {
c.checker.check()
runtime_notifyListNotifyOne(&c.notify)
}
// Broadcast wakes all goroutines waiting on c.
//
// It is allowed but not required for the caller to hold c.L
// during the call.
func (c *Cond) Broadcast() {
c.checker.check()
runtime_notifyListNotifyAll(&c.notify)
}
// copyChecker holds back pointer to itself to detect object copying.
type copyChecker uintptr
func (c *copyChecker) check() {
if uintptr(*c) != uintptr(unsafe.Pointer(c)) &&
!atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c))) &&
uintptr(*c) != uintptr(unsafe.Pointer(c)) {
panic("sync.Cond is copied")
}
}
// noCopy may be embedded into structs which must not be copied
// after the first use.
//
// See https://golang.org/issues/8005#issuecomment-190753527
// for details.
type noCopy struct{}
// Lock is a no-op used by -copylocks checker from `go vet`.
func (*noCopy) Lock() {}
func (*noCopy) Unlock() {}
使用例子⌗
可以按照官方给的使用例子:
// c.L.Lock()
// 某个条件满足才进行Wait,否则可能会导致Wait发生在
// 唤醒之后,从而导致死锁
// for !condition() {
// c.Wait()
// }
// ... make use of condition ...
// c.L.Unlock()
如:
package main
import (
"fmt"
"sync"
"time"
)
var locker = new(sync.Mutex)
var cond = sync.NewCond(locker)
var condition = false
func main() {
var wg sync.WaitGroup
wg.Add(10)
for i := 0; i < 10; i++ {
go func(x int) {
cond.L.Lock() //获取锁
defer cond.L.Unlock() //释放锁
for !condition {
cond.Wait()
}
fmt.Println(x)
wg.Done()
}(i)
}
time.Sleep(time.Second * 3)
cond.L.Lock()
condition = true
cond.Broadcast()
cond.L.Unlock()
wg.Wait()
}
注意:sync.Cond需要开发人员把握锁以及condition()的条件,比较容易发生死锁。
其他代码中的应用⌗
k8s中的client-go代码中的DeltaFIFO队列实现就是用了sync.Cond来实现:
type DeltaFIFO struct {
// lock/cond protects access to 'items' and 'queue'.
lock sync.RWMutex
cond sync.Cond
// 其他字段省略
}
初始化:
func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO {
return NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KeyFunction: keyFunc,
KnownObjects: knownObjects,
})
}
func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO {
if opts.KeyFunction == nil {
opts.KeyFunction = MetaNamespaceKeyFunc
}
f := &DeltaFIFO{
items: map[string]Deltas{},
queue: []string{},
keyFunc: opts.KeyFunction,
knownObjects: opts.KnownObjects,
emitDeltaTypeReplaced: opts.EmitDeltaTypeReplaced,
}
// f.lock 与 f.cond 结合使用
f.cond.L = &f.lock
return f
}
再来看看Pop方法,何时调用Wait:
// Pop阻塞,直到队列中有一些项目,然后返回一个。
// 如果多个项目准备就绪,它们将按照它们被添加/更新的顺序返回。
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
f.lock.Lock()
defer f.lock.Unlock()
for {
// 有数据就不用Wait
for len(f.queue) == 0 {
// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
// 当队列为空时,将阻止Pop()的调用,直到新项目入队。
// When Close() is called, the f.closed is set and the condition is broadcasted.
// 当Close()被调用时,f.closed被设置,条件被广播。
// Which causes this loop to continue and return from the Pop().
if f.closed {
return nil, ErrFIFOClosed
}
// 阻塞直到被广播(说明有数据)
f.cond.Wait()
}
id := f.queue[0]
f.queue = f.queue[1:]
if f.initialPopulationCount > 0 {
f.initialPopulationCount--
}
item, ok := f.items[id]
if !ok {
// This should never happen
klog.Errorf("Inconceivable! %q was in f.queue but not f.items; ignoring.", id)
continue
}
delete(f.items, id)
err := process(item)
if e, ok := err.(ErrRequeue); ok {
f.addIfNotPresent(id, item)
err = e.Err
}
// Don't need to copyDeltas here, because we're transferring
// ownership to the caller.
return item, err
}
}
何时调用Broadcast:
// Close the queue.
func (f *DeltaFIFO) Close() {
f.lock.Lock()
defer f.lock.Unlock()
f.closed = true
f.cond.Broadcast()
}
// addIfNotPresent inserts deltas under id if it does not exist, and assumes the caller
// already holds the fifo lock.
func (f *DeltaFIFO) addIfNotPresent(id string, deltas Deltas) {
f.populated = true
if _, exists := f.items[id]; exists {
return
}
f.queue = append(f.queue, id)
f.items[id] = deltas
// 有数据,则广播
f.cond.Broadcast()
}
// queueActionLocked appends to the delta list for the object.
// queueActionLocked追加到该对象的delta列表。
// Caller must lock first.
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
id, err := f.KeyOf(obj)
if err != nil {
return KeyError{obj, err}
}
oldDeltas := f.items[id]
newDeltas := append(oldDeltas, Delta{actionType, obj})
newDeltas = dedupDeltas(newDeltas)
if len(newDeltas) > 0 {
if _, exists := f.items[id]; !exists {
f.queue = append(f.queue, id)
}
f.items[id] = newDeltas
f.cond.Broadcast()
} else {
// This never happens, because dedupDeltas never returns an empty list
// when given a non-empty list (as it is here).
// If somehow it happens anyway, deal with it but complain.
if oldDeltas == nil {
klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; ignoring", id, oldDeltas, obj)
return nil
}
klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; breaking invariant by storing empty Deltas", id, oldDeltas, obj)
f.items[id] = newDeltas
return fmt.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; broke DeltaFIFO invariant by storing empty Deltas", id, oldDeltas, obj)
}
return nil
}
更新于 2021/05/16 23:53
Read other posts