0%

Linux并发与同步(三)互斥锁

原图

概述

互斥锁类似于 count值等于1的信号量,为什么内核社区要重新开发互斥锁,mutex有如下优点:

  • mutex数据结构的定义比信号量小
  • 互斥锁相对于信号量要简单轻便一些,在锁争用激烈的测试场景下,互斥锁比信号量执行速度更快
  • 可扩展性更好

mutex数据结构

下面来看 mutex数据结构的定义。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<include/linux/mutex.h>
struct mutex {
atomic_long_t owner;
raw_spinlock_t wait_lock;
#ifdef CONFIG_MUTEX_SPIN_ON_OWNER
struct optimistic_spin_queue osq; /* Spinner MCS lock */
#endif
struct list_head wait_list;
#ifdef CONFIG_DEBUG_MUTEXES
void *magic;
#endif
#ifdef CONFIG_DEBUG_LOCK_ALLOC
struct lockdep_map dep_map;
#endif
};
  • wait_lock:自旋锁,用于保护 wait_list 睡眠等待队列
  • wait_list:用于管理所有在互斥锁上睡眠的进程,没有成功获取锁的进程会在此链表上睡眠
  • owner: Linux4.10内核把原来的 count成员和 owner成员合并成一个。原来的count是一个原子值,1表示锁没有被持有,0表示锁被持有,负数表示锁被持有且有等待者在排队。现在新版本的 owner中,0表示锁没有未被持有,非零值则表示锁持有者的task_struct指针的值。另外,最低3位有特殊的含义
1
2
3
4
5
#define MUTEX_FLAG_WAITERS	0x01
#define MUTEX_FLAG_HANDOFF 0x02
#define MUTEX_FLAG_PICKUP 0x04

#define MUTEX_FLAGS 0x07
  • osq:用于实现MCS锁机制 MUTEX_FLAG_WAITERS:表示互斥锁的等待队列里有等待者,解锁的时候必须唤醒这些等候的进程。 MUTEX_FLAG_HANDOFF:对互斥锁的等待队列中的第一个等待者会设置这个标志位,锁持有者在解锁的时候把锁直接传递给第一个等待者。 MUTEX_FLAG_PICKUP:表示锁的传递已经完成。

互斥锁实现了乐观自旋( optimistic spinnig)等待机制。准确地说,互斥锁比读写信号量更早地实现了自旋等待机制。自旋等待机制的核心原理是当发现锁持有者正在临界区执行并且没有其它优先级高的进程要调度时,当前进程坚信锁持有者会很快离开临界区并释放锁,因此与其睡眠等待不如乐观地自旋等待,以减少陲眠唤醒的开销。在实现自旋等待机制时,内核实现了一套MCS锁机制来保证只有一个等待者自旋等待锁持有者释放锁。

互斥锁的快速通道

互斥锁的初化有两种方式,一种是静态使用DEFINE_MUTE宏,另一种是在内核代码中动态使用mutex_init()函数。

1
2
3
4
5
6
7
8
9
#define __MUTEX_INITIALIZER(lockname) \
{ .owner = ATOMIC_LONG_INIT(0) \
, .wait_lock = __RAW_SPIN_LOCK_UNLOCKED(lockname.wait_lock) \
, .wait_list = LIST_HEAD_INIT(lockname.wait_list) \
__DEBUG_MUTEX_INITIALIZER(lockname) \
__DEP_MAP_MUTEX_INITIALIZER(lockname) }

#define DEFINE_MUTEX(mutexname) \
struct mutex mutexname = __MUTEX_INITIALIZER(mutexname)

下面来看mutex_lock()函数是如何实现的。

1
2
3
4
5
6
7
8
void __sched mutex_lock(struct mutex *lock)
{
might_sleep();

if (!__mutex_trylock_fast(lock))
__mutex_lock_slowpath(lock);
}
EXPORT_SYMBOL(mutex_lock);

__mutex_trylock_fast()函数判断是否可以快速获取锁若不能通过快速通道获取锁,那么要进入慢速通道 __mutex_lock_slowpath().

1
2
3
4
5
6
7
8
9
10
static __always_inline bool __mutex_trylock_fast(struct mutex *lock)
{
unsigned long curr = (unsigned long)current;
unsigned long zero = 0UL;

if (atomic_long_try_cmpxchg_acquire(&lock->owner, &zero, curr))
return true;

return false;
}

__mutex_trylock_fast()函数实现的重点是 atomic_long_try_cmpxchg_acquire()函数,如果以cmpxchg()函数的语义来理解,会得出错误的结论。比如,当lock->owner和zero相等时,说明lock这个锁没有被持有,那么可以成功获取锁,把当前进程的 task_struct->curr的值赋给lock->owner,然后函数返回lock->owner的旧值,也就是0。这时if判断语句应该判断atomic_long_try_cmpxchg_acquire()函数是否返回0オ对。但是在 Linux5.0内核的代码里和我们想的完全相反,那是怎么回事呢? 细心的读者可以通过翻阅 Linux内核的git日志信息找到答案。在 Linux4.18内核中有个优化的补丁。锁的子系统维护者 Peter Zijlstra通过比较反汇编代码发现在x86_64架构下使用try_cmpxchg()代替cmpxchg()数可以少执行ー次test指令。try_cmpxchg()函数的核心还是调用 cmpxchg()函数,但是返回值发生了变化,它返回一个布尔值,表示 cmpxchg()函数的返回值是否和第二个参数的值相等。 因此,当原子地判断出lock->owr字段为0时,说明锁没有被进程持有,那么可以进入快速通道以迅速获取锁,把当前进程的task_struct指针的值原子设置到lock->owner字段中。若lock->owner字段不为0,则说明该锁已经被进程持有,那么要进入慢速通道__mutex_lock_slowpath()。

互斥锁的慢速通道

__mutex_lock_slowpath()函数调用__mutex_lock()-->__mutex_lock_common()函数来实现.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
/*
* Lock a mutex (possibly interruptible), slowpath:
*/
static __always_inline int __sched
__mutex_lock_common(struct mutex *lock, unsigned int state, unsigned int subclass,
struct lockdep_map *nest_lock, unsigned long ip,
struct ww_acquire_ctx *ww_ctx, const bool use_ww_ctx)
{
struct mutex_waiter waiter;
struct ww_mutex *ww;
int ret;

if (!use_ww_ctx)
ww_ctx = NULL;

might_sleep();

MUTEX_WARN_ON(lock->magic != lock);

ww = container_of(lock, struct ww_mutex, base);
if (ww_ctx) {
if (unlikely(ww_ctx == READ_ONCE(ww->ctx)))
return -EALREADY;

if (ww_ctx->acquired == 0)
ww_ctx->wounded = 0;

#ifdef CONFIG_DEBUG_LOCK_ALLOC
nest_lock = &ww_ctx->dep_map;
#endif
}

preempt_disable();
mutex_acquire_nest(&lock->dep_map, subclass, 0, nest_lock, ip);

if (__mutex_trylock(lock) ||
mutex_optimistic_spin(lock, ww_ctx, NULL)) {
/* got the lock, yay! */
lock_acquired(&lock->dep_map, ip);
if (ww_ctx)
ww_mutex_set_context_fastpath(ww, ww_ctx);
preempt_enable();
return 0;
}

raw_spin_lock(&lock->wait_lock);
/*
* After waiting to acquire the wait_lock, try again.
*/
if (__mutex_trylock(lock)) {
if (ww_ctx)
__ww_mutex_check_waiters(lock, ww_ctx);

goto skip_wait;
}

debug_mutex_lock_common(lock, &waiter);
waiter.task = current;
if (use_ww_ctx)
waiter.ww_ctx = ww_ctx;

lock_contended(&lock->dep_map, ip);

if (!use_ww_ctx) {
/* add waiting tasks to the end of the waitqueue (FIFO): */
__mutex_add_waiter(lock, &waiter, &lock->wait_list);
} else {
ret = __ww_mutex_add_waiter(&waiter, lock, ww_ctx);
if (ret)
goto err_early_kill;
}

set_current_state(state);
for (;;) {
bool first;

if (__mutex_trylock(lock))
goto acquired;

if (signal_pending_state(state, current)) {
ret = -EINTR;
goto err;
}

if (ww_ctx) {
ret = __ww_mutex_check_kill(lock, &waiter, ww_ctx);
if (ret)
goto err;
}

raw_spin_unlock(&lock->wait_lock);
schedule_preempt_disabled();

first = __mutex_waiter_is_first(lock, &waiter);

set_current_state(state);
if (__mutex_trylock_or_handoff(lock, first) ||
(first && mutex_optimistic_spin(lock, ww_ctx, &waiter)))
break;

raw_spin_lock(&lock->wait_lock);
}
raw_spin_lock(&lock->wait_lock);
acquired:
__set_current_state(TASK_RUNNING);

if (ww_ctx) {
if (!ww_ctx->is_wait_die &&
!__mutex_waiter_is_first(lock, &waiter))
__ww_mutex_check_waiters(lock, ww_ctx);
}

__mutex_remove_waiter(lock, &waiter);

debug_mutex_free_waiter(&waiter);

skip_wait:
/* got the lock - cleanup and rejoice! */
lock_acquired(&lock->dep_map, ip);

if (ww_ctx)
ww_mutex_lock_acquired(ww, ww_ctx);

raw_spin_unlock(&lock->wait_lock);
preempt_enable();
return 0;

err:
__set_current_state(TASK_RUNNING);
__mutex_remove_waiter(lock, &waiter);
err_early_kill:
raw_spin_unlock(&lock->wait_lock);
debug_mutex_free_waiter(&waiter);
mutex_release(&lock->dep_map, ip);
preempt_enable();
return ret;
}

将其绘制出一张图如下:

乐观自旋等待机制

乐观自旋等待机制是互斥锁的一个新特性。乐观自旋等待机制其实就是判断锁持有者正在临界区执行时,可以断定锁持有者会很快退出临界区并且释放锁,与其进入睡眠队列,不如像自旋锁一样自旋等待,因为睡眠与唤醒的代价可能更高。乐观自旋等待机制的实现在mutex_optimistic_spin()函数里。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
static __always_inline bool
mutex_optimistic_spin(struct mutex *lock, struct ww_acquire_ctx *ww_ctx,
struct mutex_waiter *waiter)
{
if (!waiter) {
if (!mutex_can_spin_on_owner(lock))
goto fail;

if (!osq_lock(&lock->osq))
goto fail;
}

for (;;) {
struct task_struct *owner;

/* Try to acquire the mutex... */
owner = __mutex_trylock_or_owner(lock);
if (!owner)
break;

if (!mutex_spin_on_owner(lock, owner, ww_ctx, waiter))
goto fail_unlock;

cpu_relax();
}

if (!waiter)
osq_unlock(&lock->osq);

return true;
...

return false;
}

采用一张图来表示如下:

详细内容不做分析。

读写锁

上述介绍的信号量有一个明显的缺点--没有区分临界区的读写属性。读写锁通常允许多个线程并发地读访问临界区,但是写访问只限制于一个线程。读写锁能有效地提高并发性,在多处理器系统中允许有多个读者同时访问共享资源,但写者是排他性的,读写锁具有如特性。

  • 允许多个读者同时进入临界区,但同一时刻写者不能进入
  • 同一时刻只允许一个写者进入临界区
  • 读者和写者不能同时进入临界区

读写锁有两种,分别是读者自旋锁类型和读者信号量。自旋锁类型的读写锁数据结构定义在 include/linux/rwlock_types.h头文件中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
<include/linux/rwlock_types.h>
typedef struct {
arch_rwlock_t raw_lock;
#ifdef CONFIG_DEBUG_SPINLOCK
unsigned int magic, owner_cpu;
void *owner;
#endif
#ifdef CONFIG_DEBUG_LOCK_ALLOC
struct lockdep_map dep_map;
#endif
} rwlock_t;
...
/*
* The queue read/write lock data structure
*/

typedef struct qrwlock {
union {
atomic_t cnts;
struct {
#ifdef __LITTLE_ENDIAN
u8 wlocked; /* Locked for write? */
u8 __lstate[3];
#else
u8 __lstate[3];
u8 wlocked; /* Locked for write? */
#endif
};
};
arch_spinlock_t wait_lock;
} arch_rwlock_t;

常用的函数如下:

  • rwlock_init():初始化 rwlock
  • write_lock():申请写者锁
  • write_unlock():释放写者锁
  • read_lock():申请读者锁
  • read_unlock():释放读者锁
  • read_lock_irq():关闭中断并且申请读者锁
  • write_lock_irg():关闭中断并且申请写者锁
  • write_unlock_irq():打开中断并且释放写者锁

和自旋锁一样,读写锁有关闭中断和下半部的版本。自旋锁类型的读写锁实现比较简单。

总结

  • 互斥锁最先实现自旋等待机制
  • 互斥锁在睡眠之前尝试获取锁
  • 互斥锁通过实现MCS锁来避免多个CPU争用锁而导致CPU高速缓存行颠簸现象,正是因为互斥锁的简洁性和高效性,所以互斥锁的使用场景比信号量要更严格

使用互斥锁需要注意的约束条件如下:

  • 同ー时刻只有一个线程可以持有互斥锁
  • 只有锁持有者可以解锁。不能在一个进程中持有互斥锁,而在另外一个进程释放它。因此互斥锁不适合内核与用户空间复杂的同步场景,信号量和读写信号量则比较合适
  • 不允许递归地加锁和解锁
  • 当进程持有互斥锁时,进程不可以退出
  • 互斥锁必须使用官方接口函数来初始化
  • 互斥锁可以睡眠,所以不允许在中断处理程序或者中断下半部(如 tasklet、定时器)中使用

在实际项目中,该如何选择自旋锁、信号量和互斥锁呢? 在中断上下文中可以毫不犹豫地使用自旋锁,如果临界区有睡眠、隐含睡眠的动作及内核接口函数,应避免选择自旋锁。在信号量和互斥锁中该如何选择呢?除非代码场景不符合上述互斥锁的约束中的某一条,否则可以优先使用互斥锁。

参考文献

《奔跑吧Linux内核》