前言

在上一篇文章中,我们一起学习了如何使用 Go 中的互斥锁 Mutex,那么本篇文章,我们就一起来探究下 Mutex 底层是如何实现的,知其然,更要知其所以然!

说明:本文中的示例,均是基于Go1.17 64位机器

Mutex 特性

Mutex 就是一把互斥锁,可以想象成一个令牌,有且只有这一个令牌,只有持有令牌的 goroutine 才能进入房间(临界区),在房间内执行完任务后,走出房间并把令牌交出来,如果还有其余的 goroutine 等着获取这个令牌,让他们再去抢这个令牌,抢到的重复上述过程,没抢到的继续等。

上述是从宏观角度来看待互斥锁的,但是在 Mutex 内部,有着非常复杂的抢锁逻辑,Mutex 的发展也经历了几个版本,我们可以用拿令牌进餐厅吃饭来形象比喻下几个主要版本的变化。

前提:餐厅一次只能进入一个人,餐厅有一个令牌,只有持有这个令牌的人才能进去;从餐厅出来后,需要把这个令牌归还

版本一

餐厅在门外设置了一个队伍,如果令牌空闲,拿着令牌去餐厅用餐;如果令牌不是空闲的,新来的人就要去队伍后面排队等待叫号。(不是空闲包含两种情况:持有令牌的人在餐厅里面,队伍是空的;队伍有人排队。)

此版本的问题就是:只要令牌不是空闲的,新来的人必须直接去排队,没有商量的余地。这样看起来很公平,遵循先来后到的原则,但是对于餐厅来说,营业效率就会有所降低,即单位时间内接待顾客的数量(IO)会减少。为什么这样说呢,举个例子,有个顾客从餐厅出来归还令牌后,需要去等待队列去叫号,被叫到号的这个人需要花费时间走到餐厅(获取到CPU),这中间就浪费了不少时间。

版本二

为了提高营业效率,允许刚到门口的顾客和被叫到号的顾客一起去抢令牌,而不是直接去排队,这样就给了新人机会。举个例子:当持有令牌的人从餐厅出来归还令牌后,去等待队列叫个号,如果此时有顾客刚到门口,被叫到号的和新到的顾客一起抢令牌,抢到的就可以直接进入餐厅,抢不到的接着去排队,由于刚到的顾客离门口近(正在占据CPU),被叫到号的顾客离得远(需要等CPU),而且刚到的顾客可能不只一个,所以被叫到号的顾客很大概率抢不到令牌,可能还没走到门口(还没获取到CPU)就被新来的顾客抢走了。不管怎么样,这样提高了餐厅的效率,可以在单位时间内接待更多的客户。

版本三

餐厅发现有些人用餐很快,如果让抢不到令牌的先别直接去排队,而是在门口转悠会(当然不能一直转悠,有条件限制,到了限制还是要去排队),这种方式类似乐观锁,那么有顾客从餐厅出来后,就不用去叫号了,直接让门口的这些顾客继续抢就行了,这样就进一步提高了餐厅的运行效率,毕竟叫号真的太浪费时间了。

版本四

经过了多个版本的优化,餐厅的运营效率是越来越高了,但是有些人可要准备要骂娘了,这些人是谁呢,当然是已经在队伍里等待的那些人。由于给了新人机会,如果持续有新顾客来,那么已经在队伍里的那些人永远也拿不到令牌,可真的要饿死了。

Mutex 在这个版本只为三件事:公平、公平、还是tm的公平!坚持让每一个人都不饿肚子的原则,餐厅搞出了一个新的模式:饥饿模式。如果有顾客等的时间超过了阈值(1ms),餐厅变为饥饿模式,在该模式下,所有新来的顾客直接去排队,然后按照先来先到的顺序,依次将令牌给等待队列队首的顾客。

那么什么时候由饥饿模式变为正常模式呢?当拿到令牌的顾客发现自己从等待到拿到令牌的时间小于阈值(1ms)了,或者等待队伍没人等了,此时餐厅就变为正常模式,毕竟上述两个条件都说明当前餐厅竞争不是很激烈了。

同时这个版本修复了以前的一个问题:之前从等待队列唤醒的顾客如果没有抢到令牌,再回到队列后是插到队尾,这样对已经排到第一位的顾客太不友好了。在这个版本中修复了该问题,唤醒的顾客如果没有抢到令牌,直接插入到队首,下次叫号还是他。

特性总结

经过了多次迭代,目前的版本有了如下特性:

给新人机会:让刚来的顾客和从队列唤醒的顾客一起去抢令牌,唤醒也是按照先来先到的原则唤醒;

保持乐观态度:没抢到不是直接去排队,而是可以在门口转悠会,说不定里面的人马上就出来了;

正常模式和饥饿模式的切换:为了公平起见,正常模式下给了新人机会,一起去抢令牌;饥饿模式下照顾老人,所有人老老实实排队,按照先来先到的顺序拿令牌。整个餐厅既保持了公平,又提高了运行效率,一切井然有序起来了。

回归正题

让我们从餐厅回到 Go 中来,Mutex 有两种模式:正常模式和饥饿模式:

正常模式下,如果当前锁正在被持有,抢不到锁的就会进入一个先进先出的等待队列。当持有锁的 goroutine 释放锁之后,按照从前到后的顺序唤醒等待队列的第一个等待者,但是不会直接给被唤醒者锁,还是需要他去抢,即在唤醒等待队列等待者这个时间,同时也会有正在运行且还未进入等待队列的 goroutine 正在抢锁 (数量可能还很多),这些都会和刚唤醒的等待者一起去抢,刚唤醒的可能还没有分到 CPU,而正在运行的正在占据了CPU,所以正在运行的更有可能获取到锁,被唤醒的等待者可能抢锁失败。如果等待者抢锁失败,他会被放到等待队列的队首,如果超过 1ms 都没抢到锁,就会从 正常模式 切换到 饥饿模式

饥饿模式下,要释放锁的 goroutine 会将锁直接交给等待队列的第一个等待者,不需要去抢了,而且新来的 goroutine 也不会尝试去抢锁,直接加入到等待队列的尾部。那么什么时候会从饥饿模式切换到正常模式呢:

(1)如果当前被唤醒的等待者获得到锁后,发现自己是队列中的最后一个,队列中没有其他等待者了,此时会切换到正常模式

(2)如果当前被唤醒的等待者获得到锁后,发现自己总共的等待时间不超过 1ms,就获得到锁了,此时也会切换到正常模式

正常模式会带来更高的吞吐量:一个 goroutine 要释放锁,更大可能会被正在运行的 goroutine 抢到,这就避免了协程的上下文切换,运行更多的 goroutine,但是有可能造成一个问题,就是锁始终被新来的 goroutine 抢走,在等待队列中的等待者始终抢不到锁,这就会导致饥饿问题。饥饿模式就是为了解决这个问题出现的,保证了每个 goroutine 都有运行的机会,防止等待时间过长。

数据结构

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
// 互斥锁
type Mutex struct {
	state int32		// 状态
	sema  uint32  // 信号量
}


const (
	mutexLocked = 1 << iota // 1
	mutexWoken // 2
	mutexStarving // 4
	mutexWaiterShift = iota // 3

	starvationThresholdNs = 1e6 // 判断是否要进入饥饿状态的阈值
)

信号量 sema 就相当于我们说的令牌,state 是 int32 类型,一共 32位,通过每个位记录了当前的状态:

state字段

mutexLocked:当前是否已经上锁,state & mutexLocked = 1 表示已经上锁;

mutexWoken:标记当前是否有唤醒的 goroutine,state & mutexWoken = 1 表示有唤醒的goroutine;

mutexStarving:当前是否为饥饿状态,state & mutexWoken = 1 表示处于饥饿状态;

mutexWaiterShift:29位,state >> mutexWaiterShift 得到等待者的数量;

Lock()

Lock()加锁方法分为两部分,第一部分是 fast path,可以理解为快捷通道,如果当前锁没被占用,直接获得锁返回;否则需要进入 slow path,判断各种条件去竞争锁,主要逻辑都在此处。

了解过原子操作的同学,对 CompareAndSwap(CAS) 应该不陌生,CompareAndSwapInt32(addr *int32, old, new int32) 有三个参数,如果地址 addr 指向的值与 old 相等,则将 addr 的值改为 new,否则不变,也就是说在我们修改前,如果有人修改了 addr 指向的值,本次修改就会失败。

1
2
3
4
5
6
7
8
9
// 上锁
func (m *Mutex) Lock() {
	// fastpath:期望当前锁没有被占用,可以快速获取到锁, CAS 修改 state 最后一位的值为1(标记锁是否被占用)
	if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
		return
	}
	// Slow path : 单独抽出来放到一个函数里,方便 fast path 被内联
	m.lockSlow()
}
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
func (m *Mutex) lockSlow() {
	var waitStartTime int64 // // 记录等待时间
	starving := false // 当前的 goroutine 是否已经饥饿了(如果已经饥饿,就会将 state 的饥饿状态置为 1)
	awoke := false  // 当前的 goroutine 是否被唤醒的
	iter := 0	// 自旋次数
	old := m.state // 保存当前的 state 状态
	for {
    
    /*
    自旋:如果满足如下条件,就会进入 if 语句,然后 continue,不断自旋:
    1. 锁被占用,且不处于饥饿模式(饥饿状态直接去排队,不允许尝试获取锁)
    2. 基于当前自旋的次数,再次自旋有意义 runtime_canSpin(iter)
    
    那么退出自旋的条件也就是:
    1. 锁被释放了,当前处于没被占用状态(说明等到了,该goroutine就会立即去获取锁)
		2. mutex进入了饥饿模式,不自旋了,没意义(饥饿状态会直接把锁交给等待队列队首的goroutine)
		3. 不符合自旋状态(自旋次数太多了,自旋失去了意义)
    
    如下代码是位操作:
    mutexLocked|mutexStarving = 00000...101
    mutexLocked = 00000...001
    如果要满足 old & 00000...101 = 00000...001,需要 old = ...0*1,即状态为:锁被占用,且不处于饥饿状态 
    
    runtime_canSpin(iter) 会根据自旋次数,判断是否可以继续自旋
    */
		if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
			
      /*
      如果 
      	1. 当前goroutine不是被唤醒的 (awoke=false) 
      	2. 锁状态唤醒标志位为0(old&mutexWoken == 0) 
      	3. 等待者数量不为0 (old>>mutexWaiterShift != 0  右移三位得到的就是等待者数量)
      
      	那么利用CAS,将 state 的唤醒标记置为1,标记自己是被唤醒的 (将state的唤醒标记置为1,说明外面有唤醒着的goroutine,那么在释放锁的时候,就不去等待队列叫号了,毕竟已经有唤醒的了)
      	
      	如果有其他 goroutine 已经设置了 state 的唤醒标记位,那么本次就会失败
      */

			if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
				atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
				awoke = true
			}
			runtime_doSpin()
      
      // 迭代次数加一
			iter++
      
      // 获取最新的状态
			old = m.state
      
      // 想再次自旋,看看锁释放了没
			continue
		}
    
    // 到这里,说明退出了自旋,当前锁没被占用 或者  系统处于饥饿模式 或者 自旋次数太多导致不符合自旋条件
  
    // new 代表当前goroutine 基于当前状态要设置的新状态
		new := old
    
    // 只要不是饥饿状态,就需要获取锁(饥饿状态直接去排队,不能抢锁)
		if old&mutexStarving == 0 {
			new |= mutexLocked
		}
    
    // 锁被占用 或者 处于饥饿模式下,新增一个等待者
		if old&(mutexLocked|mutexStarving) != 0 {
			new += 1 << mutexWaiterShift
		}
    
    // 当前 goroutine 已经进入饥饿了,且锁还没有释放,需要把 Mutex 的状态改为饥饿状态
		if starving && old&mutexLocked != 0 {
			new |= mutexStarving
		}
    
    // 如果是被唤醒的,把唤醒标志位置0,表示外面没有被唤醒的goroutine了(抢到就获得锁、抢不到就睡眠,把唤醒标志置0)
		if awoke {
			
      // 由于是被唤醒的,new 里面的 唤醒标记位一定是 1
			if new&mutexWoken == 0 {
				throw("sync: inconsistent mutex state")
			}
      
      // a &^ b 的意思就是 清零a中,ab都为1的位,即清除唤醒标记
			new &^= mutexWoken
		}
    
    /*
      利用CAS,将状态设置为新的
      1. 如果是饥饿状态,只增加一个等待者数量
      2. 正常状态,加锁标记置为 1,如果锁已被占用增加一个等待者数量
      3. 如果当前 goroutine 已经饥饿了,将 饥饿标记 置为 1
      4. 如果是被唤醒的,清除唤醒标记
    */
    
		if atomic.CompareAndSwapInt32(&m.state, old, new) {
      
      // 如果改状态之前,锁未被占用 且 处于正常模式,那么就相当于获取到锁了
			if old&(mutexLocked|mutexStarving) == 0 {
				break 
			}
      
      // 到这里说明:1. 之前锁被占用  或者 2.之前是处于饥饿状态 
      
      // 判断之前是否等待过(是否从队列里唤醒的),之前等待过,再次排队放在队首
			queueLifo := waitStartTime != 0
      
      // 如果之前没等过(新来的),设置等待起始时间当前时间
			if waitStartTime == 0 {
				waitStartTime = runtime_nanotime()
			}
      
     // 之前排过队的老人,放到等待队列队首;新人放到队尾,然后等待获取信号量
			runtime_SemacquireMutex(&m.sema, queueLifo, 1)
      
      // 锁被释放,goroutine 被唤醒
      
      // 设置当前 goroutine 饥饿状态,如果之前已经饥饿,或者距离等待开始时间超过了 1ms,也变饥饿
			starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
      
      // 获取最新的状态
			old = m.state
      
      // 如果 state 饥饿标记为1,说明当前在饥饿模式,饥饿模式下被唤醒,已经获取到锁了;
      // 饥饿状态下,释放锁没有更新等待者数量和饥饿标记,需要获得锁的goroutine去更新状态
			if old&mutexStarving != 0 {

        // 正确性校验:
        // 1. 锁还是锁住的状态(锁已经释放给当前goroutine了,不应该被锁住)
				// 2. 或者有被唤醒的goroutine(饥饿模式下不应该有醒着的goroutine,都应该去乖乖等着)
				// 3. 或者当前goroutine 的等待者数量为0(当前goroutine就是等待者)
				// 这三种情况不应该出现,与预期状态不符
        
				if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
					throw("sync: inconsistent mutex state")
				}
        
        // 加锁,减去一个等待者
				delta := int32(mutexLocked - 1<<mutexWaiterShift)
        
        // 如果当前的 goroutine 非饥饿,或者等待者只有一个(也就是只有当前goroutine,等待队列空了),可以取消饥饿状态,进入正常状态
				if !starving || old>>mutexWaiterShift == 1 {
					delta -= mutexStarving
				}
        
        // 修改状态:
        // 加锁,减去一个等待者: m.state + mutexLocked - 1<<mutexWaiterShift : 
        // 满足非饥饿条件,加锁,减去一个等待者,取消饥饿状态:
        // m.state + mutexLocked - 1<<mutexWaiterShift - mutexStarving: 
				atomic.AddInt32(&m.state, delta)
        
        // 饥饿模式下被唤醒,相当于获得锁了,可以结束
				break
			}
      
      // 之前是处于锁被占用且非饥饿状态,被唤醒,去继续抢锁
			awoke = true
      
      // 新唤醒的,自旋数量置0
			iter = 0
		} else {
      
      // 修改新状态失败,状态有更新,需要重试
			old = m.state
		}
	}
}

加锁的这部分代码,新来的 goroutine 或者从队列里面唤醒的 goroutine 都会进入如下逻辑,相当于给新人机会

  1. 乐观态度的自旋:判断是否可以自旋,如果可以自旋,就自旋等待;如果有可能,把唤醒标记位置为1,标记外面有唤醒的 goroutine,释放锁的时候就不会去队列里面唤醒了,毕竟已经有人在等待了;
  2. 修改系统状态:跳出自旋后,每个 goroutine 根据当前系统状态修改系统状态:
    1. 非饥饿状态,想要加锁(如果本来就是加锁状态,将加锁位 设置为 1 相当于不变)
    2. 锁被占用 或者 处于饥饿模式下,新增一个等待者
    3. 当前 goroutine 已经进入饥饿了,且锁还没有释放,需要把 Mutex 的状态改为饥饿状态
    4. 如果当前 goroutine 是被唤醒的,清除系统唤醒标记
  3. 利用 CAS 修改系统状态,同一时刻只有一个 goroutine 能够设置成功,但是设置成功并不代表获取到锁了:
    1. 之前是非上锁的正常状态,设置成功说明本次抢锁成功,可以返回去操作临界区了;
    2. 之前是上锁状态或者饥饿状态,本次只是新增了一个等待者,然后根据是否是新来的,去队列队尾或者队首排队,等待叫号;
  4. 从队列中被叫号唤醒,不一定是获取到锁了:
    1. 当前是饥饿状态,那么一定是获取到锁了,因为饥饿状态只把锁给队列的第一个 goroutine
    2. 非饥饿状态,将自己状态置为唤醒,再去抢锁,重复上述过程

问:系统会不会同时存在 唤醒标志和饥饿标志都为1 的情况呢?

答:不会。只有等待时间大于 1ms 的才会去设置饥饿标记,也就是只有从队列唤醒的才会去设置,那么从队列中唤醒的 goroutine ,自身的 awoke=true,每当去设置饥饿标记的时候会把唤醒标记清除。

Unlock()

Unlock()解锁方法也分为两部分,第一部分是 fast path,可以理解为快捷通道,直接把锁状态位清除,如果此时系统状态恢复到初始状态,说明没有 goroutine 在抢锁等锁,直接返回,否则进入 slow path

slow path 会根据是否为饥饿状态,做出不一样的反应:

  • 正常状态:唤醒一个 goroutine 去抢锁,等待者数量减一,并将唤醒状态置为 1
  • 饥饿状态:直接唤醒等待队列队首的 goroutine,锁的所有权直接移交(修改等待者数量、是否取消饥饿标记,由唤醒的 goroutine 去处理)。
1
2
3
4
5
6
7
8
9
func (m *Mutex) Unlock() {

	// Fast path: 把锁标记清除
	new := atomic.AddInt32(&m.state, -mutexLocked)
	if new != 0 {
		// 清除完锁标记,发现还有其他状态,比如等待队列不为空,需要唤醒其他 goroutine
		m.unlockSlow(new)
	}
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
func (m *Mutex) unlockSlow(new int32) {
  
  /* 状态正确性校验:
  	 1. 如果解锁一个上锁状态的锁,最后一位则为1,fast path 中 new 已经减去了1, 此时 new 最后一位应当为0
		 2. 如果解锁一个未上锁状态的锁,最后一位则为0,fast path 中 new 已经减去了1, 此时 new 最后一位应当为1
	 	 如果 (new+mutexLocked)&mutexLocked == 0,说明 new 当前最后一位是1,那么就是解锁了一个没有上锁的锁,状态有误
	*/
	if (new+mutexLocked)&mutexLocked == 0 {
		throw("sync: unlock of unlocked mutex")
	}
  
  // 正常模式,非饥饿,可能需要唤醒队列中的 goroutine,饥饿状态直接移交锁
	if new&mutexStarving == 0 {
		old := new
		for {

      /* 系统运转正常,锁可以正确交接,可以直接返回了:
        1. 没有等待者了 (没有等锁的了,去唤醒谁?)
        2. 有唤醒状态的 goroutine  (自旋状态的 goroutine,将唤醒状态置为1)
      	3. 有 goroutine 已经获取了锁 (Unlock方法已经将锁标记置为了0,可能自旋的此时已经抢到了锁)
      */
			if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
				return
			}
      
      // 没有唤醒状态的 goroutine,唤醒一个去抢锁
			// 减去一个等待者,并且将 唤醒标记 置为 1
			new = (old - 1<<mutexWaiterShift) | mutexWoken
			if atomic.CompareAndSwapInt32(&m.state, old, new) {
        // 第二个参数为false, 唤醒队首的 goroutine 去抢锁(不一定能抢到)
				runtime_Semrelease(&m.sema, false, 1)
				return
			}
      
      // 上面 CAS 失败,可能由于新增了一个等待者,for 循环重试
			old = m.state
		}
	} else {
		
    /*
     1. 第二个参数为 true,直接将锁的所有权,交给等待队列的第一个等待者
		 2. 注意,此时没有设置 mutexLocked =1 ,被唤醒的 goroutine 会设置
		 3. 虽然没有设置 mutexLocked ,但是饥饿模式下, Mutex 始终被认为是锁住的,都会直接排队等待移交锁
    */
		runtime_Semrelease(&m.sema, true, 1)
	}
}

总结

本篇文章首先通过餐厅的示例,形象的介绍了 Mutex 的运行特性,然后对源码进行逐行分析,学习了 Mutex 加锁 Lock() 以及 解锁 Unlock()是如何实现的。如果本篇文章对有所帮助,点个关注 + 转发哦 ^_^

更多

微信公众号:CodePlayer