Linux 中断子系统(六)Tasklet

原图

为什么有 tasklet

linux kernel 已经把中断处理分成了 top half 和 bottom half,看起来已经不错了,那为何还要提供 softirq、tasklet 和 workqueue 这些 bottom half 机制。

workqueue 和 softirq、tasklet 有本质的区别:

  • workqueue 运行在 process context,而 softirq 和 tasklet 运行在 interrupt context。因此,出现 workqueue 是不奇怪的,在有 sleep 需求的场景中需要。
  • softirq 更倾向于性能。软中断的种类是编译时静态定义的,在运行时不能添加或删除,同一种软中断的处理函数可以在多个处理器上同时执行,处理函数必须是可以重入的,需要使用锁保护临界区。
  • tasklet 更倾向于易用性。Tasklet 可以在运行时添加或删除,一个 Tasklet 同一时刻只能在一个处理器上执行,不要求处理函数是可以重入的。

数据结构

Tasklet 的数据结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
<include/linux/interrupt.h>
struct tasklet_struct
{
struct tasklet_struct *next;
unsigned long state;
atomic_t count;
bool use_callback;
union {
void (*func)(unsigned long data);
void (*callback)(struct tasklet_struct *t);
};
unsigned long data;
};

成员 next 用来把 Tasklet 添加到单向链表中。 成员 state 是 Tasklet 的状态,取值如下:

  • 0: Tasklet 没有被调度
  • (1 << TASKLET_STATE_SCHED): Tasklet 被调度,即将被执行
  • (1<< TASKLET_STATE_RUN):只在多处理器系统中使用,表示 Tasklet 正在执行

成员 count 是计数,0 表示允许 Tasklet 被执行,非零值表示禁止 Tasklet 被执行成员 func 是处理函数,成员 data 是传给处理函数的参数。 每个处理器有两条单向链表:低优先级 Tasklet 链表和高优先级 Tasklet 链表。

1
2
3
4
5
6
7
<kernel/softirq.c>
struct tasklet_head {
struct tasklet_struct *head;
struct tasklet_struct **tail;
};
static DEFINE_PER_CPU(struct tasklet_head, tasklet_vec);
static DEFINE_PER_CPU(struct tasklet_head, tasklet_hi_vec);

实现(扩展🙈)

Tasklet 是基于软中断实现的,根据优先级分为两种,低优先级 Tasklet 和高优先级 Tasklet。软中断 HI_SOFTIRO 执行高优先级 Tasklet,软中断 TASKLET_SOFTIRQ 执行低优先级 Tasklet。

调度 Tasklet

函数 tasklet_schedule() 用来调度低优先级 Tasklet,函数 tasklet_hi_schedule() 用来调度高优先级 Tasklet。以函数 tasklet_schedule() 为例说明,其代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
<include/linux/interrupt.h、kernel/softirq.c>
static inline void tasklet_schedule(struct tasklet_struct *t)
{
if (!test_and_set_bit(TASKLET_STATE_SCHED, &t->state))
__tasklet_schedule(t);
}
...
void __tasklet_schedule(struct tasklet_struct *t)
{
__tasklet_schedule_common(t, &tasklet_vec,
TASKLET_SOFTIRQ);
}
...
static void __tasklet_schedule_common(struct tasklet_struct *t,
struct tasklet_head __percpu *headp,
unsigned int softirq_nr)
{
struct tasklet_head *head;
unsigned long flags;

local_irq_save(flags);
head = this_cpu_ptr(headp);
t->next = NULL;
*head->tail = t;
head->tail = &(t->next);
raise_softirq_irqoff(softirq_nr);
local_irq_restore(flags);
}

如果 Tasklet 没有被调度过,那么首先设置调度标志位,然后把 Tasklet 添加到当前处理器的低优先级 Tasklet 链表的尾部,最后触发软中断 TASKLET_SOFTIRQ。

执行 Tasklet

初始化的时候,把软中断 TASKLET_SOFTIRQ 的处理函数注册为函数 tasklet_action,把软中断 HI_SOFTIRQ 的处理函数注册为函数 tasklet_hi_action()。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<kernel/softirq.c>
void __init softirq_init(void)
{
int cpu;

for_each_possible_cpu(cpu) {
per_cpu(tasklet_vec, cpu).tail =
&per_cpu(tasklet_vec, cpu).head;
per_cpu(tasklet_hi_vec, cpu).tail =
&per_cpu(tasklet_hi_vec, cpu).head;
}

open_softirq(TASKLET_SOFTIRQ, tasklet_action);
open_softirq(HI_SOFTIRQ, tasklet_hi_action);
}

以函数 tasklet_action() 为例说明,其代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
<kernel/softirq.c>
static __latent_entropy void tasklet_action(struct softirq_action *a)
{
tasklet_action_common(a, this_cpu_ptr(&tasklet_vec), TASKLET_SOFTIRQ);
}
static void tasklet_action_common(struct softirq_action *a,
struct tasklet_head *tl_head,
unsigned int softirq_nr)
{
struct tasklet_struct *list;

local_irq_disable();
list = tl_head->head;
tl_head->head = NULL;

/* 1. 把当前处理器的低优先级 Tasklet 链表中的所有 Tasklet 移到临时链表 list 中 */
tl_head->tail = &tl_head->head;
local_irq_enable();

/* 2. 遍历临时链表 list,依次处理每个 Tasklet */
while (list) {
struct tasklet_struct *t = list;

list = list->next;

/* 3. ,尝试锁住 Tasklet,确保一个 Tasklet 同一时刻只在一个处理器上执行 */
if (tasklet_trylock(t)) {

/* 4. 如果 Tasklet 的计数为 0,表示允许 Tasklet 被执行 */
if (!atomic_read(&t->count)) {

/* 5. 清除 Tasklet 的调度标志位,其他处理器可以调度这个 Tasklet,但是不能执行这个 Tasklet */
if (tasklet_clear_sched(t)) {
if (t->use_callback)
/* 6. 执行 Tasklet 的处理函数 */
t->callback(t);
else
/* 6. 执行 Tasklet 的处理函数 */
t->func(t->data);
}
tasklet_unlock(t);
continue;
}

/* 7. 释放 Tasklet 的锁,其他处理器就可以执行这个 Tasklet 了 */
tasklet_unlock(t);
}

/* 8. 如果尝试锁住 Tasklet 失败(表示 Tasklet 正在其他处理器上执行),或者禁止 Tasklet 被执行,那么把 Tasklet 重新添加到当前处理器的低优先级 Tasklet 链表的尾部,然后触发软中断 TASKLET_SOFTIRQ */
local_irq_disable();
t->next = NULL;
*tl_head->tail = t;
tl_head->tail = &t->next;
__raise_softirq_irqoff(softirq_nr);
local_irq_enable();
}
}

在上面的注释 4 如果 Tasklet 的计数为 0,表示允许 Tasklet 被执行 ,tasklet 就是通过这个来保证只在一个 cpu 上执行。如果该 tasklet 已经在别的 cpu 上执行了,那么我们将其挂入该 cpu 的 tasklet 链表的尾部,这样,在下一个 tasklet 执行时机到来的时候,kernel 会再次尝试执行该 tasklet,在这个时间点,也许其他 cpu 上的该 tasklet 已经执行完毕了。通过这样代码逻辑,保证了特定的 tasklet 只会在一个 cpu 上执行,不会在多个 cpu 上并发。

参考文献

http://www.wowotech.net/irq_subsystem/soft-irq.html

http://www.wowotech.net/irq_subsystem/tasklet.html

《Linux 内核深度解析》