Linux 并发与同步(二)信号量

原图

信号量

信号量( semaphore)是操作系统中最常用的同步原语之一。自旋锁是一种实现忙等待的锁,而信号量则允许进程进入睡眠状态。简单来说,信号量是一个计数器,它支持两个操作原语,即 P 和 V 操作。

信号量实现

semaphore 数据结构的定义如下。

1
2
3
4
5
6
<include/linux/semaphore.h>
struct semaphore {
raw_spinlock_t lock;
unsigned int count;
struct list_head wait_list;
};
  • lock 是自旋锁变量,用于保护 semaphore 数据结构里的 count 和 wait_list 成员
  • count 用于表示允许进入临界区的内核执行路径个数
  • wait_list 链表用于管理所有在该信号量上睡眠的进程,没有成功获取锁的进程会在这个链表上睡眠

通常通过 sema_init() 函数进行信号量的初始化,其中 __SEMAPHORE_INITIALIZER() 完成对 semaphore 数椐结构的填充,val 值通常设为 1。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#define __SEMAPHORE_INITIALIZER(name, n)				\
{ \
.lock = __RAW_SPIN_LOCK_UNLOCKED((name).lock), \
.count = n, \
.wait_list = LIST_HEAD_INIT((name).wait_list), \
}

#define DEFINE_SEMAPHORE(name) \
struct semaphore name = __SEMAPHORE_INITIALIZER(name, 1)

static inline void sema_init(struct semaphore *sem, int val)
{
static struct lock_class_key __key;
*sem = (struct semaphore) __SEMAPHORE_INITIALIZER(*sem, val);
lockdep_init_map(&sem->lock.dep_map, "semaphore->lock", &__key, 0);
}

下面来看 down() 函数。down() 函数有如下一些变体。其中 down() 函数和 down_interruptible() 函数的区别在于, down_interruptible() 函数在争用信号量失败时进入可中断的睡眠状态,而 down() 函数进入不可中断的睡眠状态。若 down_trylock() 函数返回 0,表示成功获取锁:若返回 1 获取锁失败。

1
2
3
4
5
6
extern void down(struct semaphore *sem);
extern int __must_check down_interruptible(struct semaphore *sem);
extern int __must_check down_killable(struct semaphore *sem);
extern int __must_check down_trylock(struct semaphore *sem);
extern int __must_check down_timeout(struct semaphore *sem, long jiffies);
extern void up(struct semaphore *sem);

接下来看 down_interruptible() 函数的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* down_interruptible - acquire the semaphore unless interrupted
* @sem: the semaphore to be acquired
*
* Attempts to acquire the semaphore. If no more tasks are allowed to
* acquire the semaphore, calling this function will put the task to sleep.
* If the sleep is interrupted by a signal, this function will return -EINTR.
* If the semaphore is successfully acquired, this function returns 0.
*/
int down_interruptible(struct semaphore *sem)
{
unsigned long flags;
int result = 0;

might_sleep();
raw_spin_lock_irqsave(&sem->lock, flags);
if (likely(sem->count > 0))
sem->count--;
else
result = __down_interruptible(sem);
raw_spin_unlock_irqrestore(&sem->lock, flags);

return result;
}
EXPORT_SYMBOL(down_interruptible);

首先,第 6~9 行代码判断是否进入自旋锁的临界区。注意,后面的操作会临时打开自旋锁,若涉及对信号量中最重要的 cout 的操作,需要自旋锁来保护,并且在某些中断处理函数中也可能会操作该信号量。由于需要关闭本地 CPU 中断,因此这里采用 raw_spin_lock_irqsave() 函数。当成功进入自旋锁的临界区之后,首先判断 sem->count 是否大于 0。如果大于 0,则表明当前进程可以成功地获得信号量,并将 sem->count 值减 1,然后退出。如果 sem->count 小于或等于 0,表明当前进程无法获得该信号量,则调用 __down_interruptible() 函数来执行睡眠操作。

1
2
3
4
static noinline int __sched __down_interruptible(struct semaphore *sem)
{
return __down_common(sem, TASK_INTERRUPTIBLE, MAX_SCHEDULE_TIMEOUT);
}

__down_interruptible() 函数内部调用__down_common() 函数来实现 state 参数为 TASK_INTERRUPTIBLE。 timeout 参数为 MAX_SCHEDULE_TIMEOUT,是一个很大的 LONG_MAX 值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
<down_interruptible()->__down_interruptible()->down_common()>
/*
* Because this function is inlined, the 'state' parameter will be
* constant, and thus optimised away by the compiler. Likewise the
* 'timeout' parameter for the cases without timeouts.
*/
static inline int __sched __down_common(struct semaphore *sem, long state,
long timeout)
{
struct semaphore_waiter waiter;

list_add_tail(&waiter.list, &sem->wait_list);
waiter.task = current;
waiter.up = false;

for (;;) {
if (signal_pending_state(state, current))
goto interrupted;
if (unlikely(timeout <= 0))
goto timed_out;
__set_current_state(state);
raw_spin_unlock_irq(&sem->lock);
timeout = schedule_timeout(timeout);
raw_spin_lock_irq(&sem->lock);
if (waiter.up)
return 0;
}

timed_out:
list_del(&waiter.list);
return -ETIME;

interrupted:
list_del(&waiter.list);
return -EINTR;
}

semaphore_waiter 数据结构用于描述获取信号量失败的进程,每个进程会有一个 semaphore_waiter 数据结构,并且把当前进程放到信号量 sem 的成员变量 wait_list 链表中,接下来的 for 循环将当前进程的 task_struct、状态设置成 TASK_INTERRUPTIBLE,然后调用 schedule_timeout() 函数主动让出 CPU,相当于当前进程睡眠。注意 schedule_timeou() 函数的参数是 MAX_SCHEDULE_TIMEOUT,它并没有实际等待 MAX_SCHEDULE_TIMEOUT 的时间。当进程再次被调度回来执行时, schedule_timeout() 函数返回并判断再次被调度的原因,当 waiter.up 为 true 时,说明睡眠在 wait_list 队列中的进程被该信号量的 UP 操作唤醒,进程可以获得该信号量。如果进程被其它 CPU 发送的信号或者由于超时等而唤醒,则跳转到 timed_out 或 interrupted 标签处并且返回错误代码。

down_interruptible() 函数中,在调用 __down_interruptible() 函数时加入 sem->lock 的自旋锁,这是自旋锁的一个临界区。前面提到,自旋锁临界区中绝对不能睡眠,难道这是例外?仔细阅读__down_common() 函数,会发现 for 循环在调用 schedule_timeout() 函数主动让出 CPU 时,先调用 raw_spin_unlock_irq() 函数释放了该锁,即调用 schedule_timeout() 函数时已经没有自旋锁了,可以让进程先睡眠,“醒来时”再补加一个锁,这是内核编程的常用技巧。

下面来看与 dowm() 函数对应的 up() 函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* up - release the semaphore
* @sem: the semaphore to release
*
* Release the semaphore. Unlike mutexes, up() may be called from any
* context and even by tasks which have never called down().
*/
void up(struct semaphore *sem)
{
unsigned long flags;

raw_spin_lock_irqsave(&sem->lock, flags);
if (likely(list_empty(&sem->wait_list)))
sem->count++;
else
__up(sem);
raw_spin_unlock_irqrestore(&sem->lock, flags);
}
EXPORT_SYMBOL(up);

如果信号量上的等待队列(sem->wait_list)为空,则说明没有进程在等待该信号量,直接把 sem->count 加 1 即可,如果不为空,则说明有进程在等待队列里睡民,需要调用__up() 唤醒它们。

1
2
3
4
5
6
7
8
static noinline void __sched __up(struct semaphore *sem)
{
struct semaphore_waiter *waiter = list_first_entry(&sem->wait_list,
struct semaphore_waiter, list);
list_del(&waiter->list);
waiter->up = true;
wake_up_process(waiter->task);
}

首先来看 sm->wait_list 中第一个成员 waiter,这个等待队列是先进先出队列,在 dowm() 函数中通过 list_add_tail() 函数添加到等待队列尾部。把 waiter->up 设置为 true, 把然后调用 wake_up_process() 函数唤醒 wite->task 进程。在 down() 函数中, walter->task 进程醒来后会判断 waiter->up 变量是否为 true 如果为 true 则直接回 0,表示该过得成功获取信号量。

读写信号量

rw_semaphore 数据结构

rw_semaphore 数据结构定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
struct rw_semaphore {
atomic_long_t count;
/*
* Write owner or one of the read owners as well flags regarding
* the current state of the rwsem. Can be used as a speculative
* check to see if the write owner is running on the cpu.
*/
atomic_long_t owner;
#ifdef CONFIG_RWSEM_SPIN_ON_OWNER
struct optimistic_spin_queue osq; /* spinner MCS lock */
#endif
raw_spinlock_t wait_lock;
struct list_head wait_list;
#ifdef CONFIG_DEBUG_RWSEMS
void *magic;
#endif
#ifdef CONFIG_DEBUG_LOCK_ALLOC
struct lockdep_map dep_map;
#endif
};
  • count 用于表示读写信号量的计数。以前读写信号量的实现用 activity 来表示。若 activity 为 0,表示没有读者和写者:若 activity 为ー 1,表示有写者,若 activity 大于 0,表示有读者。现在 count 的计数方法已经发生了变化

  • wait_list 链表用于管理所有在该信号量上睡眠的进程,没有成功获取锁的进程会睡眠在这个链表上

  • wait_lock 是一个自旋锁变量,用于实现对 rw_semaphore 数据结构中 count 成员的原子操作和保护

  • osq:MCS 锁

  • owner:当写者成功获取锁时, owner 指向锁持有者的 task_struct 数据结构

  • 若 count 初始化为 0,表示没有读者也没有写者

  • 若 count 为正数,表示有 count 个读者

  • 当有写者申请锁时,count 值要加上 RWSEM_ACTIVE_WIRTE_BIAS

  • 当有读者申请锁时,若 count 值要加上 RWSEM_ACTIVE_READ_BIAS,即 count 值加一

  • 当有多个写者中请时,判断 count 值是否等于 RWSEM_ACTIVE_WIRTE_BIAS(-65535)若不相等,说明已经有写者抢先持有锁,要自旋等待或者睡眠

  • 当读者申请锁时,若 count 值加上 RWSEM_ACTIVE_READ_BIAS(1)后还小于 0,说明已经有个写者已经成功申请锁,只能等待写者释放锁

把 count 值当作十六进制或者十进制数不是开发人员的原本设计意图,其实应该把 count 值分成两个字段:Bit[0:31] 为低字段,表示正在持有锁的读者或者写者的个数;Bit[32:63] 为高字段,通常为负数,表示有一个正在持有或者处于 pending 状态的写者,以及等待队列中有读写者在等待。因此 count 值可以看作一个二元数,含义如下

  • RWSEM_ACTIVE_READ_BIAS=0x000 0001 =[0,1],表示有一个读者
  • RWSEM_ACTIVE_WRITE_BIAS=0xFFFF 0000 =[-1,1],表示当前只有一个活跃的写者
  • RWSEM_WAITING_BIAS=0xFFFF 0000=[-1,0],表示睡眠等待队列中有读写者在睡眠等待

kernel/locking/rwsem-xadd.c 代码中有如下一段关于 count 值含义的比较全面的介绍。

  • 0x0000 0000 初始化值,表示没有读者和写者
  • 0x0000 000X:表示有 X 个活跃的读者或者正在申请的读者,没有写者干扰
  • 0xFFFF 000X:或者表示可能有 X 个活跃读者,还有写者正在等待;或者表示有二个写者持有锁,还有多个读者正在等待
  • 0xFFFF 0001:或者表示当前只有一个活跃的写者;或者表示一个活跃或者申请中的读者,还有写者正在睡眠等待
  • 0xFFFF 0000 表示 WAITING_BIAS,有读者或者写者正在等待,但是它们都还没成功获取锁。

申请读者类型信号量

下面来看 down_read() 函数的实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
void __sched down_read(struct rw_semaphore *sem)
{
might_sleep();
rwsem_acquire_read(&sem->dep_map, 0, 0, _RET_IP_);

LOCK_CONTENDED(sem, __down_read_trylock, __down_read);
}
...
#define rwsem_acquire_read(l, s, t, i) lock_acquire_shared(l, s, t, NULL, i)
#define lock_acquire_shared(l, s, t, n, i) lock_acquire(l, s, t, 1, 1, n, i)
...
/*
* We are not always called with irqs disabled - do that here,
* and also avoid lockdep recursion:
*/
void lock_acquire(struct lockdep_map *lock, unsigned int subclass,
int trylock, int read, int check,
struct lockdep_map *nest_lock, unsigned long ip)
{
unsigned long flags;

trace_lock_acquire(lock, subclass, trylock, read, check, nest_lock, ip);

if (!debug_locks)
return;

if (unlikely(!lockdep_enabled())) {
/* XXX allow trylock from NMI ?!? */
if (lockdep_nmi() && !trylock) {
struct held_lock hlock;

hlock.acquire_ip = ip;
hlock.instance = lock;
hlock.nest_lock = nest_lock;
hlock.irq_context = 2; // XXX
hlock.trylock = trylock;
hlock.read = read;
hlock.check = check;
hlock.hardirqs_off = true;
hlock.references = 0;

verify_lock_unused(lock, &hlock, subclass);
}
return;
}

raw_local_irq_save(flags);
check_flags(flags);

lockdep_recursion_inc();
__lock_acquire(lock, subclass, trylock, read, check,
irqs_disabled_flags(flags), nest_lock, ip, 0, 0);
lockdep_recursion_finish();
raw_local_irq_restore(flags);
}
EXPORT_SYMBOL_GPL(lock_acquire);

申请读者锁流程如下:

释放读者类型信号量

1
2
3
4
int up_read(struct rw_semaphore *sem)
{
return perf_singlethreaded ? 0 : pthread_rwlock_unlock(&sem->lock);
}

关于写信号量这里不做详细分析了。

小结

读写信号量在内核中应用广泛,特别是在内存管理中,除了前面介绍的 mm->mmap_sem 还有 RMAP 系统中的的 anon_vma->rwsem、 address_space 数据结构中的 i_mmap_rwsem 等。 总结读写信号量的重要特性:

  • down_read():如果一个进程持有读者锁,那么允许继续申请多个读者锁,申请的写者锁则要等待
  • down_write():如果一个进程持有写者锁,那么第二个进程申请该写者锁要自旋等待申请读者锁则要等待
  • up_write() up_read():如果等待队列中第一个成员是写者,那么唤醒该写者;否则唤醒排在等待队列中最前面连续的几个读者

总结

信号量有一个有趣的特点,它可以同时允许任意数量的锁持有者。信号量初始化函数为 sema_init(struct semaphore *sem, int val),其中 val 的值可以大于或等于 1。当 val 大于 1 时,表示允许在同一时刻至多有 val 个锁持有者,这种信号量叫作计数信号量( counting semaphore);当 val 等于 1 时,同一时刻仅允许一个 CPU 持有锁,这种信号量叫作互斥信号量或者二进制信号量( binary semaphore)。在 Linux 内核中,大多使用 val 值为 1 的信号量。相比自旋锁,信号量是一个允许睡眠的锁。

参考文献

《奔跑吧 Linux 内核》