本文共 6769 字,大约阅读时间需要 22 分钟。
条件变量(conditional variable),和互斥锁一样,也是一个同步工具。我们常常会把条件变量与互斥锁一起讨论。实际上,条件变量是基于互斥锁的,它必须有互斥锁的支撑才能发挥作用。
条件变量并不是被用来保护临界区和共享资源的,它是用于协调想要访问共享资源的那些线程的。当共享资源的状态发生变化时,它可以被用来通知被互斥锁阻塞的线程。
使用条件变量的最大优势就是在效率方面的提升。当共享资源的状态不满足条件的时候,想操作它的线程再也不用循环往复的做检查了,只要等待通知就好了。条件变量需要与互斥锁配合使用。条件变量的初始化需要互斥锁,并且它的方法有的也是基于互斥锁的。
条件变量提供的方法有三个:在利用条件变量等待通知的时候,需要在它基于的那个互斥锁的保护下进行。
在进行单发通知或光爆通知的时候,需要在对应的互斥锁解锁之后再做操作。创建条件变量
结合代码理解上面的含义,先创建几个变量:var lock sync.RWMutexsendCond := sync.NewCond(&lock)recvCond := sync.NewCond(lock.RLocker())
条件变量的类型
lock是一个读写锁,基于这把锁,创建了2个代表条件变量的变量,这两个变量的类型是*sync.Cond,是由sync.NewCond函数来初始化的。初始化
与互斥锁锁不同,这里不是开箱即用的,只能使用sync.NewCond函数来创建它的指针值,这个函数需要一个sync.Locker类型的参数。 前面说过,条件变量是基于互斥锁的,它必须有互斥锁的支持才能够起作用。因此,这里的参数是必须的,它也会参与到条件变量的方法实现中去。 sync.Locker接口 sync.Locker其实是一个接口,包含两个方法Lock()和Unlock():type Locker interface { Lock() Unlock()}
sync.Mutex类型sync,RWMutex类型都拥有这两个方法,不过都是指针方法。因此这两个类型的指针类型才是sync.Locker接口的实现类型。
初始化的过程
在为sendCond初始化的时候,把lock变量的指针作为参数。这里lock变量的Lock方法和Unlock方法分别用于对其中写锁的锁定和解锁。这里与实现接口的两个方法的名称是对应的。 在为recvCond初始化的时候,需要的是lock变量的读锁,并且还得是sync.Locker接口类型,就是要实现了Lock和Unlock方法的读锁。可是lock变量中用于读锁的方法却是RLock方法和RUnlock方法,这里名称不对应了。不过有一个RLocker方法可以实现这一需求,下面是源码里实现的部分,很简单:// RLocker returns a Locker interface that implements// the Lock and Unlock methods by calling rw.RLock and rw.RUnlock.func (rw *RWMutex) RLocker() Locker { return (*rlocker)(rw)}type rlocker RWMutexfunc (r *rlocker) Lock() { (*RWMutex)(r).RLock() }func (r *rlocker) Unlock() { (*RWMutex)(r).RUnlock() }
这里我有一些小疑惑,3个方法里面都是类型断言吧。RLocker方法把原来的读写锁类型转成一个新的类型然后返回。后面的两个方法,为了用新类型调用读写锁类型里的方法,先进行类型断言,转成读写锁原本的类型,然后调用它的方法。
使用条件变量
下面是截取的使用时的部分代码:lock.Lock()for !isEmpty { sendCond.Wait()}isEmpty = false// 这里可以做写入的操作lock.Unlock()recvCond.Signal()
上面是一个写入的流程。之前的代码定义了一个状态变量isEmpty,只有状态为空的时候,才允许写入,写入后把状态设置为非空。
这里要先调用Lock方法,等待通知(wait)是要在互斥锁的保护下进行的。 然后再操作完之后,先调用Unlock方法,再发送通知,发送通知的操作要在互斥锁解锁之后。 这里等待的出sendCond的信号,而最后发送的是recvCond的信号。在另一个读取的流程里则正好相反。利用条件变量可以实现单向的通知,而这里要实现双向的通知,就需要两个条件变量。这是条件变量的基本使用原则。上面把关键的代码分析了一下,下面是完整的示例代码:
package mainimport ( "fmt" "sync" "time" "flag")var useCond boolfunc init() { flag.BoolVar(&useCond, "cond", false, "是否使用条件变量")}type msgBox struct { message string isEmpty bool sendCond *sync.Cond recvCond *sync.Cond}func main() { flag.Parse() fmt.Println("是否开启了条件变量保护:", useCond) var lock sync.RWMutex msgBox := msgBox{ isEmpty: true, // 默认值是false,状态初始值应该为true sendCond: sync.NewCond(&lock), // 不是开箱即用的,需要在使用前初始化 recvCond: sync.NewCond(lock.RLocker()), } done := make(chan struct{}) max := 5 // 写操作的goroutine go func(max int) { defer func() { done <- struct{}{} }() for i := 0; i < max; i++ { time.Sleep(time.Millisecond * 200) // 先进行保护 lock.Lock() // 再等待通知 for useCond && !msgBox.isEmpty { msgBox.sendCond.Wait() } msgBox.isEmpty = false msg := fmt.Sprintf("第 %d 条消息", i) msgBox.message = msg fmt.Printf("发送消息[%d]: %s\n", i, msg) // 先解锁 lock.Unlock() // 再发送通知 msgBox.recvCond.Signal() } }(max) // 读操作的goroutine go func(max int) { defer func() { done <- struct{}{} }() for j := 0; j < max; j++ { time.Sleep(time.Millisecond * 500) lock.RLock() for useCond && msgBox.isEmpty { msgBox.recvCond.Wait() } msgBox.isEmpty = true msg := msgBox.message fmt.Printf("接收消息[%d]: %s\n", j, msg) lock.RUnlock() msgBox.sendCond.Signal() } }(max) <-done <-done fmt.Println("Over")}
代码中条件变量的作用
在这个例子里,写的时候要获取到写锁,读的时候要获取到读锁,这个逻辑和之前互斥锁是一样的。但是只是获取到锁还不能做操作,这里还要再做一个限制,所以就用到了条件变量。 在这个例子里,写操作和读操作是需要成对出现的。写完一次之后,依然能获取到写锁,但是不能立刻写。而是要等待读操作把之前写入的数据读过之后,才能再次写入,把之前的内容覆盖掉。读操作也是一样。这里就需要两个goroutine之间传递信号了。 通过命令行参数分别在开启/关闭条件变量的环境下运行,可以看到其中的作用:go run main.gogo run main.go -cond
条件变量的Wait方法主要做了4件事:
先解锁,在阻塞
在Wait方法里,必须要先解锁,在阻塞当前goroutine。否则就违背了互斥锁要成对出现的原则。并且当前goroutine在解锁千就阻塞的话,当前goroutine就不可能在执行解锁了。即使不考虑原则,让别的goroutine来解锁,又会有重复解锁可能。使用for语句
并且Wait方法建议是放在一个for循环里的。这里似乎也是可以用if语句的。但是if语句只能检查状态一次,而for的话可以进行多次检查。如果goroutine收到了通知而唤醒,但是此时检查时发现状态还是不对,那么就应该再次调用Wait方法。保险起见,在包裹条件变量的Wait方法总是应该使用for语句。这2个方法都是用来发送通知的。Signal方法的通知只会唤醒一个goroutine,而Broadcast方法的通知会唤醒所有等待的goroutine。Wait方法会把当前的goroutine添加到通知队列的队尾,而Signal方法会从通知队列的队首开始查找可以被唤醒的goroutine。因此Signal方法唤醒的一般是最早等待的那个goroutine。
适用场景
这2个方法的行为决定他们的适用场景。确定只有一个goroutine在等待通知,或者值需要唤醒一个goroutine的时候,就使用Signal方法。否则,使用Broadcast方法总是没错的,Broadcast方法的适用场景更多。通知的即时性
条件变量的通知具有即时性。如果发送通知的时候没有goroutine在等待,那么该次通知就会被直接丢弃。之后再开始等待的goroutine需要等待之后的通知。还是前面那个示例,稍微改了改,把读写锁换成了互斥锁,通知方法把Signal换成了Broadcast:
package mainimport ( "fmt" "sync" "time")var lock sync.Mutex// 匿名结构体,定义并初始化赋值// 嵌入式锁(Embedded lock)的场景适合使用匿名结构体var msgBox = struct { message string isEmpty bool sendCond *sync.Cond recvCond *sync.Cond}{ isEmpty: true, sendCond: sync.NewCond(&lock), recvCond: sync.NewCond(&lock),}// 用于设置消息的函数func send(id, index int) { lock.Lock() for !msgBox.isEmpty { msgBox.sendCond.Wait() } msg := fmt.Sprintf("msg: [%d-%d]", id, index) msgBox.message = msg fmt.Printf("发送消息[%d-%d]: %s\t", id, index, msg) msgBox.isEmpty = false lock.Unlock() msgBox.recvCond.Broadcast()}// 用于读取消息的函数func recv(id, index int) { lock.Lock() for msgBox.isEmpty { msgBox.recvCond.Wait() } msg := msgBox.message msgBox.message = "" fmt.Printf("接收消息[%d-%d]: %s\n", id, index, msg) msgBox.isEmpty = true lock.Unlock() msgBox.sendCond.Broadcast()}func main() { done := make(chan struct{}) count := 5 // 启动一个goroutine用于发送 go func(id, count int) { defer func() { done <- struct{}{} }() for i := 0; i < count; i++ { time.Sleep(time.Millisecond * 100) send(id, i) } }(0, count * 2) // 启动两个goroutine用于接收 go func(id, count int) { defer func() { done <- struct{}{} }() for i := 0; i < count; i++ { time.Sleep(time.Millisecond * 300) recv(id, i) } }(1, count) go func(id, count int) { defer func() { done <- struct{}{} }() for i := 0; i < count; i++ { time.Sleep(time.Millisecond * 400) recv(id, i) } }(2, count) <- done <- done <- done fmt.Println("Over")}
转载于:https://blog.51cto.com/steed/2346930