前言

前面🔗说过,Cond实现了一个条件变量,是等待或宣布一个事件发生的goroutines的汇合点。

就是说,使用sync.Cond可以做到多个协程等待某个协程通知的场景。

使用channel可以实现一读一写的场景,而Cond则实现多读一写的场景。

源码解析

简化版方法签名:

// Cond结构体
type Cond struct {}

// NewCond 返回带Locker的Cond,这个Locker可以是
// *Mutex 或 *RWMutex
func NewCond(l Locker) *Cond {}

// 等待L的解锁并挂起goroutine
func (c *Cond) Wait() {}

// 唤醒1个因c阻塞的goroutine,
// 如果在Signal之后才Wait会导致all goroutines are asleep - deadlock
func (c *Cond) Signal() {}

// 唤醒所有因c阻塞的goroutine
// 如果在Broadcast之后才Wait会导致all goroutines are asleep - deadlock
func (c *Cond) Broadcast() {}

因此,在Signal或者Broadcast前要先保证目标的协程已经进入了Wait状态,否则会导致死锁。因为Signal或者Broadcast只唤醒当前正在被Wait阻塞的协程。

Cond的定义

// Copyright 2011 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package sync
import (
	"sync/atomic"
	"unsafe"
)
// Cond implements a condition variable, a rendezvous point
// for goroutines waiting for or announcing the occurrence
// of an event.
// Cond实现了一个条件变量,它是goroutines等待或宣布事件发生的集合点。

// Each Cond has an associated Locker L (often a *Mutex or *RWMutex),
// which must be held when changing the condition and
// when calling the Wait method.
// 每个Cond都有一个相关的Locker L(通常是一个*Mutex或*RWMutex),
// 在改变条件和调用Wait方法时,必须持有这个L。
// A Cond must not be copied after first use.
type Cond struct {
	noCopy noCopy
	// L is held while observing or changing the condition
	L Locker
	// notifyList是用于实现sync.Cond的基于票证的通知列表
	// 参考:
	// https://go.googlesource.com/go/+/go1.16.4/src/runtime/sema.go#446
	notify  notifyList
	checker copyChecker
}
// NewCond returns a new Cond with Locker l.
func NewCond(l Locker) *Cond {
	return &Cond{L: l}
}
// Wait atomically unlocks c.L and suspends execution
// of the calling goroutine. After later resuming execution,
// Wait locks c.L before returning. Unlike in other systems,
// Wait cannot return unless awoken by Broadcast or Signal.
//
// Because c.L is not locked when Wait first resumes, the caller
// typically cannot assume that the condition is true when
// Wait returns. Instead, the caller should Wait in a loop:
//
//    c.L.Lock()
//    for !condition() {
//        c.Wait()
//    }
//    ... make use of condition ...
//    c.L.Unlock()
//

// Wait会先把加入到待唤醒队列,再释放锁,然后执行等待,
// 当其他goroutine调用Broadcast或者Signal来通知其恢复执行后,
// 会重新上锁
func (c *Cond) Wait() {
	c.checker.check()
	// notifyListAdd将调用者添加到通知列表中,以便它可以接收通知。
	// 调用者必须最终调用notifyListWait来等待这样的通知,并显式传参。
	// 参考:
	// https://go.googlesource.com/go/+/go1.16.4/src/sync/runtime.go#31
	// https://go.googlesource.com/go/+/go1.16.4/src/runtime/sema.go#475
	t := runtime_notifyListAdd(&c.notify)
	c.L.Unlock()
	// notifyListWait等待通知。如果自那以后已发送,
    // 调用notifyListAdd,它立即返回。否则,它将阻塞。
	// 参考:
	// https://go.googlesource.com/go/+/go1.16.4/src/sync/runtime.go#34
	// https://go.googlesource.com/go/+/go1.16.4/src/runtime/sema.go#485
	runtime_notifyListWait(&c.notify, t)
	c.L.Lock()
}
// Signal wakes one goroutine waiting on c, if there is any.
//
// It is allowed but not required for the caller to hold c.L
// during the call.
func (c *Cond) Signal() {
	c.checker.check()
	runtime_notifyListNotifyOne(&c.notify)
}
// Broadcast wakes all goroutines waiting on c.
//
// It is allowed but not required for the caller to hold c.L
// during the call.
func (c *Cond) Broadcast() {
	c.checker.check()
	runtime_notifyListNotifyAll(&c.notify)
}
// copyChecker holds back pointer to itself to detect object copying.
type copyChecker uintptr
func (c *copyChecker) check() {
	if uintptr(*c) != uintptr(unsafe.Pointer(c)) &&
		!atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c))) &&
		uintptr(*c) != uintptr(unsafe.Pointer(c)) {
		panic("sync.Cond is copied")
	}
}
// noCopy may be embedded into structs which must not be copied
// after the first use.
//
// See https://golang.org/issues/8005#issuecomment-190753527
// for details.
type noCopy struct{}
// Lock is a no-op used by -copylocks checker from `go vet`.
func (*noCopy) Lock()   {}
func (*noCopy) Unlock() {}

使用例子

可以按照官方给的使用例子:

//    c.L.Lock()
//    某个条件满足才进行Wait,否则可能会导致Wait发生在
//    唤醒之后,从而导致死锁
//    for !condition() {  
//        c.Wait()
//    }
//    ... make use of condition ...
//    c.L.Unlock()

如:

package main

import (
    "fmt"
    "sync"
    "time"
)

var locker = new(sync.Mutex)
var cond = sync.NewCond(locker)
var condition = false

func main() {
	var wg sync.WaitGroup
	wg.Add(10)
    for i := 0; i < 10; i++ {
        go func(x int) {
            cond.L.Lock()         //获取锁
            defer cond.L.Unlock() //释放锁
			for !condition {
				cond.Wait()
			}
            fmt.Println(x)
			wg.Done()
        }(i)
    }   
    time.Sleep(time.Second * 3)
	cond.L.Lock()
	condition = true
    cond.Broadcast()  
	cond.L.Unlock()
	wg.Wait()         
}

注意:sync.Cond需要开发人员把握锁以及condition()的条件,比较容易发生死锁。

其他代码中的应用

k8s中的client-go代码中的DeltaFIFO队列实现就是用了sync.Cond来实现:

type DeltaFIFO struct {
	// lock/cond protects access to 'items' and 'queue'.
	lock sync.RWMutex
	cond sync.Cond

	// 其他字段省略
}

初始化

func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO {
	return NewDeltaFIFOWithOptions(DeltaFIFOOptions{
		KeyFunction:  keyFunc,
		KnownObjects: knownObjects,
	})
}

func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO {
	if opts.KeyFunction == nil {
		opts.KeyFunction = MetaNamespaceKeyFunc
	}

	f := &DeltaFIFO{
		items:        map[string]Deltas{},
		queue:        []string{},
		keyFunc:      opts.KeyFunction,
		knownObjects: opts.KnownObjects,

		emitDeltaTypeReplaced: opts.EmitDeltaTypeReplaced,
	}
	// f.lock 与 f.cond 结合使用
	f.cond.L = &f.lock
	return f
}

再来看看Pop方法,何时调用Wait:

// Pop阻塞,直到队列中有一些项目,然后返回一个。
// 如果多个项目准备就绪,它们将按照它们被添加/更新的顺序返回。
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
	f.lock.Lock()
	defer f.lock.Unlock()
	for {
		// 有数据就不用Wait
		for len(f.queue) == 0 {
			// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
			// 当队列为空时,将阻止Pop()的调用,直到新项目入队。
			// When Close() is called, the f.closed is set and the condition is broadcasted.
			// 当Close()被调用时,f.closed被设置,条件被广播。
			// Which causes this loop to continue and return from the Pop().
			if f.closed {
				return nil, ErrFIFOClosed
			}

			// 阻塞直到被广播(说明有数据)
			f.cond.Wait()
		}
		id := f.queue[0]
		f.queue = f.queue[1:]
		if f.initialPopulationCount > 0 {
			f.initialPopulationCount--
		}
		item, ok := f.items[id]
		if !ok {
			// This should never happen
			klog.Errorf("Inconceivable! %q was in f.queue but not f.items; ignoring.", id)
			continue
		}
		delete(f.items, id)
		err := process(item)
		if e, ok := err.(ErrRequeue); ok {
			f.addIfNotPresent(id, item)
			err = e.Err
		}
		// Don't need to copyDeltas here, because we're transferring
		// ownership to the caller.
		return item, err
	}
}

何时调用Broadcast:

// Close the queue.
func (f *DeltaFIFO) Close() {
	f.lock.Lock()
	defer f.lock.Unlock()
	f.closed = true
	f.cond.Broadcast()
}

// addIfNotPresent inserts deltas under id if it does not exist, and assumes the caller
// already holds the fifo lock.
func (f *DeltaFIFO) addIfNotPresent(id string, deltas Deltas) {
	f.populated = true
	if _, exists := f.items[id]; exists {
		return
	}

	f.queue = append(f.queue, id)
	f.items[id] = deltas
	// 有数据,则广播
	f.cond.Broadcast()
}

// queueActionLocked appends to the delta list for the object.
// queueActionLocked追加到该对象的delta列表。
// Caller must lock first.
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
	id, err := f.KeyOf(obj)
	if err != nil {
		return KeyError{obj, err}
	}
	oldDeltas := f.items[id]
	newDeltas := append(oldDeltas, Delta{actionType, obj})
	newDeltas = dedupDeltas(newDeltas)

	if len(newDeltas) > 0 {
		if _, exists := f.items[id]; !exists {
			f.queue = append(f.queue, id)
		}
		f.items[id] = newDeltas
		f.cond.Broadcast()
	} else {
		// This never happens, because dedupDeltas never returns an empty list
		// when given a non-empty list (as it is here).
		// If somehow it happens anyway, deal with it but complain.
		if oldDeltas == nil {
			klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; ignoring", id, oldDeltas, obj)
			return nil
		}
		klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; breaking invariant by storing empty Deltas", id, oldDeltas, obj)
		f.items[id] = newDeltas
		return fmt.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; broke DeltaFIFO invariant by storing empty Deltas", id, oldDeltas, obj)
	}
	return nil
}

更新于 2021/05/16 23:53