原图

概述

工作队列不完全是中断处理程序的下半部。内核的很多模块需要异步执行函数,这些模块也可以创建一个内核线程来异步执行函数。但是,如果每个模块都创建自己的内核线程,会造成内核线程的数量过多,内存消耗比较大,影响系统性能。所以,最好的方法是提供一种通用机制,让这些模块把需要异步执行的函数交给工作队列执行,共享内核线程,节省资源。


实现(扩展🙈)

首先介绍一下工作队列使用的术语。

  • work:工作,也称为工作项。
  • work_queue:工作队列,就是工作的集合,work_queue 和 work 是一对多的关系。
  • worker:工人,一个工人对应一个内核线程,我们把工人对应的内核线程称为工人线程。
  • worker_pool:工人池,就是工人的集合,工人池和工人是一对多的关系。
  • pool_workqueue:中介,负责建立工作队列和工人池之间的关系。工作队列和 pool_workqueue 是一对多的关系,pool_workqueue 和工人池是一对一的关系。

数据结构

工作队列分为两种。

  • 绑定处理器的工作队列:默认创建绑定处理器的工作队列,每个工人线程绑定到一个处理器
  • 不绑定处理器的工作队列:创建工作队列的时候需要指定标志位 WO_UNBOUND,工人线程不绑定到某个处理器,可以在处理器之间迁移

绑定处理器的工作队列的数据结构如下图所示,工作队列在每个处理器上有一个 pool_workqueue 实例,一个 pool_workqueue 实例对应一个工人池,一个工人池有一条工人链表,每个工人对应一个内核线程。向工作队列中添加工作项的时候,选择当前处理器的 pool_workqueue 实例、工人池和工人线程。 原图

不绑定处理器的工作队列的数据结构如下图所示,工作队列在每个内存节点上有一个 pool_workqueue 实例,一个 pool_workqueue 实例对应一个工人池,一个工人池有一条工人链表,每个工人对应一个内核线程。向工作队列中添加工作项的时候,选择当前处理器所属的内存节点的 pool_workqueue 实例、工人池和工人线程。 原图

不绑定处理器的工作队列还有一个默认的 pool_workaueue 实例(workqueue_struct.dfl_pwq),当某个处理器下线的时候,使用默认的 pool_workqueue 实例。

添加工作项

函数 queue_work() 用来向工作队列中添加一个工作项,把主要工作委托给函数 queue_work_on(),把第一个参数“int cpu”设置为 WORK_CPU_UNBOUND,意思是不绑定到任何处理器,优先选择当前处理器。

1
2
3
4
5
6
<include/linux/workqueue.h>
static inline bool queue_work(struct workqueue_struct *wq,
struct work_struct *work)
{
return queue_work_on(WORK_CPU_UNBOUND, wq, work);
}

函数 queue_work_on 的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<kernel/workqueue.c>
bool queue_work_on(int cpu, struct workqueue_struct *wq,
struct work_struct *work)
{
bool ret = false;
unsigned long flags;

local_irq_save(flags);

if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
__queue_work(cpu, wq, work);
ret = true;
}

local_irq_restore(flags);
return ret;
}

如果工作项没有添加过,那么给工作项设置标志位 WORK_STRUCT_PENDING_BIT 然后把主要工作委托给函数__queue_work()。函数 queue_work 的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
static void __queue_work(int cpu, struct workqueue_struct *wq,
struct work_struct *work)
{
struct pool_workqueue *pwq;
struct worker_pool *last_pool;
struct list_head *worklist;
unsigned int work_flags;
unsigned int req_cpu = cpu;

lockdep_assert_irqs_disabled();

if (unlikely(wq->flags & __WQ_DRAINING) &&
WARN_ON_ONCE(!is_chained_work(wq)))
return;
rcu_read_lock();
retry:
/* 1. 从工作队列中选择 pool_workqueue 实例。如果是绑定处理器的工作队列,
* 那么选择当前处理器的 pool_workqueue 实例如果是不绑定处理器的工作队列
* 那么选择当前处理器所属的内存节点的 pool_workqueue 实例。
*/
if (wq->flags & WQ_UNBOUND) {
if (req_cpu == WORK_CPU_UNBOUND)
cpu = wq_select_unbound_cpu(raw_smp_processor_id());
pwq = unbound_pwq_by_node(wq, cpu_to_node(cpu));
} else {
if (req_cpu == WORK_CPU_UNBOUND)
cpu = raw_smp_processor_id();
pwq = per_cpu_ptr(wq->cpu_pwqs, cpu);
}

/* 2. 如果@work 之前在其他 pool_workaueue 中,它可能仍在那里运行,在这种情况下,
* 工作需要在该池上排队以保证不可重入。
*/
last_pool = get_work_pool(work);
if (last_pool && last_pool != pwq->pool) {
struct worker *worker;

raw_spin_lock(&last_pool->lock);

worker = find_worker_executing_work(last_pool, work);

if (worker && worker->current_pwq->wq == wq) {
pwq = worker->current_pwq;
} else {
/* meh... not running there, queue here */
raw_spin_unlock(&last_pool->lock);
raw_spin_lock(&pwq->pool->lock);
}
} else {
raw_spin_lock(&pwq->pool->lock);
}

/* 3. 如果 pool_workqueue 实例的未处理工作数量小于限制,那么把工作添加到 pool_workqueue 实例
* 对应的工人池的链表 worklist 中;如果 pool_workqueue 实例的未处理工作数量达到限制,那么给
* 工作设置标志位 WORK_STRUCT_DELAYED,并把工作添加到 pool_workqueue 实例的链表 delayed_works 中。
*/
if (unlikely(!pwq->refcnt)) {
if (wq->flags & WQ_UNBOUND) {
raw_spin_unlock(&pwq->pool->lock);
cpu_relax();
goto retry;
}
/* oops */
WARN_ONCE(true, "workqueue: per-cpu pwq for %s on cpu%d has 0 refcnt",
wq->name, cpu);
}

/* pwq determined, queue */
trace_workqueue_queue_work(req_cpu, pwq, work);

if (WARN_ON(!list_empty(&work->entry)))
goto out;

pwq->nr_in_flight[pwq->work_color]++;
work_flags = work_color_to_flags(pwq->work_color);

if (likely(pwq->nr_active < pwq->max_active)) {
trace_workqueue_activate_work(work);
pwq->nr_active++;
worklist = &pwq->pool->worklist;
if (list_empty(worklist))
pwq->pool->watchdog_ts = jiffies;
} else {
work_flags |= WORK_STRUCT_DELAYED;
worklist = &pwq->delayed_works;
}

debug_work_activate(work);
/* 4. 把工作添加到注释 3 中描述的链表中 */
insert_work(pwq, work, worklist, work_flags);

out:
raw_spin_unlock(&pwq->pool->lock);
rcu_read_unlock();
}

工人处理工作

每个工人对应一个内核线程,一个工人池对应一个或多个工人。多个工人从工人池的未处理工作链表(worker_pool.worklist)中取工作并处理。工人线程的处理函数是 worker_thread(),调用函数 process_one_work() 处理一个工作项。函数 worker_thread() 的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
<kernel/workqueue.c>
static int worker_thread(void *__worker)
{
struct worker *worker = __worker;
struct worker_pool *pool = worker->pool;

/* tell the scheduler that this is a workqueue worker */
set_pf_worker(true);
woke_up:
raw_spin_lock_irq(&pool->lock);

/* 1. 如果工人太多,想要减少工人的数量,那么当前工人线程退出 */
/* am I supposed to die? */
if (unlikely(worker->flags & WORKER_DIE)) {
raw_spin_unlock_irq(&pool->lock);
WARN_ON_ONCE(!list_empty(&worker->entry));
set_pf_worker(false);

set_task_comm(worker->task, "kworker/dying");
ida_simple_remove(&pool->worker_ida, worker->id);
worker_detach_from_pool(worker);
kfree(worker);
return 0;
}

/* 2. 工人退出空闲状态 */
worker_leave_idle(worker);
recheck:
/* 3. 如果不需要本工人执行工作,那么本工人进入空闲状态 */
/* no more worker necessary? */
if (!need_more_worker(pool))
goto sleep;

/* 4. 如果工人池中没有空闲的工人,那么创建一些工人使用*/
/* do we need to manage? */
if (unlikely(!may_start_working(pool)) && manage_workers(worker))
goto recheck;

/*
* ->scheduled list can only be filled while a worker is
* preparing to process a work or actually processing it.
* Make sure nobody diddled with it while I was sleeping.
*/
WARN_ON_ONCE(!list_empty(&worker->scheduled));

/*
* 5. 完成准备阶段。 我们保证至少有一名闲置工人或其他人已经担任管理角色。
* 这是@worker 开始参与并发管理(如果适用)并在反弹后恢复并发管理的地方。
* 有关详细信息,请参阅 rebind_workers()。
*/
worker_clr_flags(worker, WORKER_PREP | WORKER_REBOUND);

/* 6. 从工人池的链表 worklist 中取一个工作 */
do {
struct work_struct *work =
list_first_entry(&pool->worklist,
struct work_struct, entry);

pool->watchdog_ts = jiffies;

/* 7. 如果是正常工作,那么调用函数 process_one_work() 执行正常工作,
* 然后执行工人的链表 scheduled 中的特殊工作
*/
if (likely(!(*work_data_bits(work) & WORK_STRUCT_LINKED))) {
/* optimization path, not strictly necessary */
process_one_work(worker, work);
if (unlikely(!list_empty(&worker->scheduled)))
process_scheduled_works(worker);
} else {
/* 8. 如果是特殊工作,那么首先把工作添加到工人的链表 scheduled 的尾部,
* 然后执行工人的链表 scheduled 中的特殊工作
*/
move_linked_works(work, &worker->scheduled, NULL);
process_scheduled_works(worker);
}
} while (keep_working(pool)); //如果有工作需要处理,并且处于运行状态的工人数量不超过 1,那么本工人继续执行工作

worker_set_flags(worker, WORKER_PREP);
sleep: //工人进入空闲状态,睡眠
/*
* pool->lock is held and there's no work to process and no need to
* manage, sleep. Workers are woken up only while holding
* pool->lock or from local cpu, so setting the current state
* before releasing pool->lock is enough to prevent losing any
* event.
*/
worker_enter_idle(worker);
__set_current_state(TASK_IDLE);
raw_spin_unlock_irq(&pool->lock);
schedule();
goto woke_up;
}

下面解释一下正常工作和特殊工作。 向工作队列中添加正常工作,是直接添加到工人池的链表 worklist 中。 调用函数 flush_work(t) 等待工作 t 执行完,实现方法是添加一个特殊工作:屏障工作,执行这个屏障工作的时候就可以确定工作 t 执行完。如果工作 t 正在被工人 p 执行,那么把屏障工作直接添加到工人 p 的链表 scheduled 中;如果工作 t 没有执行,那么把屏障工作添加到工人池的链表 worklist 中,并且给屏障工作设置标志位 WORK_STRUCT_LINKED。函数 process_one_work() 负责处理一个工作,其代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
<kernel/workqueue.c>
static void process_one_work(struct worker *worker, struct work_struct *work)
__releases(&pool->lock)
__acquires(&pool->lock)
{
struct pool_workqueue *pwq = get_work_pwq(work);
struct worker_pool *pool = worker->pool;
bool cpu_intensive = pwq->wq->flags & WQ_CPU_INTENSIVE;
int work_color;
struct worker *collision;

/* ensure we're on the correct CPU */
WARN_ON_ONCE(!(pool->flags & POOL_DISASSOCIATED) &&
raw_smp_processor_id() != pool->cpu);

/* 1. 一个工作不应该被多个工人并发执行。如果一个工作正在被工人的其他工人执行,
* 那么把这个工作添加到这个工人的链表 scheduled 中延后执行
*/
/*
* A single work shouldn't be executed concurrently by
* multiple workers on a single cpu. Check whether anyone is
* already processing the work. If so, defer the work to the
* currently executing one.
*/
collision = find_worker_executing_work(pool, work);
if (unlikely(collision)) {
move_linked_works(work, &collision->scheduled, NULL);
return;
}

/* 2. 把工人添加到工人池的散列表 busy_hash 中。工人的成员 current_work 指向当前工作,
* 成员 current_func 指向当前工作的处理函数,成员 current_pwq 指向当前 pool_workqueue 实例
*/
debug_work_deactivate(work);
hash_add(pool->busy_hash, &worker->hentry, (unsigned long)work);
worker->current_work = work;
worker->current_func = work->func;
worker->current_pwq = pwq;
work_color = get_work_color(work);

/*
* Record wq name for cmdline and debug reporting, may get
* overridden through set_worker_desc().
*/
strscpy(worker->desc, pwq->wq->name, WORKER_DESC_LEN);

list_del_init(&work->entry);

/* 3. 如果工作队列是处理器密集型的,那么给工人设置标志位 WORKER_CPU_INTENSIVE,
* 工人不再被工人池动态调度。这使@worker 脱离了并发管理,下一个代码块将链接
* 执行待处理的工作项
*/
/*
* CPU intensive works don't participate in concurrency management.
* They're the scheduler's responsibility. This takes @worker out
* of concurrency management and the next code block will chain
* execution of the pending work items.
*/
if (unlikely(cpu_intensive))
worker_set_flags(worker, WORKER_CPU_INTENSIVE);

/* 4. 对于不绑定处理器或处理器密集型的工作队列,唤醒更多空闲的工人处理工作 */
/*
* Wake up another worker if necessary. The condition is always
* false for normal per-cpu workers since nr_running would always
* be >= 1 at this point. This is used to chain execution of the
* pending work items for WORKER_NOT_RUNNING workers such as the
* UNBOUND and CPU_INTENSIVE ones.
*/
if (need_more_worker(pool))
wake_up_worker(pool);

/* 5. 记录最后一个池并清除 PENDING,这应该是对@work 的最后一次更新。 此外,
* 在@pool->lock 中执行此操作,以便在禁用 IRQ 时同时发生 PENDING 和排队状态更改
*/
/*
* Record the last pool and clear PENDING which should be the last
* update to @work. Also, do this inside @pool->lock so that
* PENDING and queued state changes happen together while IRQ is
* disabled.
*/
set_work_pool_and_clear_pending(work, pool->id);

raw_spin_unlock_irq(&pool->lock);

lock_map_acquire(&pwq->wq->lockdep_map);
lock_map_acquire(&lockdep_map);

lockdep_invariant_state(true);
trace_workqueue_execute_start(work);
/* 6. 执行工作的处理函数 */
worker->current_func(work);
/*
* While we must be careful to not use "work" after this, the trace
* point will only record its address.
*/
trace_workqueue_execute_end(work, worker->current_func);
lock_map_release(&lockdep_map);
lock_map_release(&pwq->wq->lockdep_map);

if (unlikely(in_atomic() || lockdep_depth(current) > 0)) {
pr_err("BUG: workqueue leaked lock or atomic: %s/0x%08x/%d\n"
" last function: %ps\n",
current->comm, preempt_count(), task_pid_nr(current),
worker->current_func);
debug_show_held_locks(current);
dump_stack();
}

/*
* The following prevents a kworker from hogging CPU on !PREEMPTION
* kernels, where a requeueing work item waiting for something to
* happen could deadlock with stop_machine as such work item could
* indefinitely requeue itself while all other CPUs are trapped in
* stop_machine. At the same time, report a quiescent RCU state so
* the same condition doesn't freeze RCU.
*/
cond_resched();

raw_spin_lock_irq(&pool->lock);

/* clear cpu intensive status */
if (unlikely(cpu_intensive))
worker_clr_flags(worker, WORKER_CPU_INTENSIVE);

/* tag the worker for identification in schedule() */
worker->last_func = worker->current_func;

/* we're done with it, release */
hash_del(&worker->hentry);
worker->current_work = NULL;
worker->current_func = NULL;
worker->current_pwq = NULL;
pwq_dec_nr_in_flight(pwq, work_color);
}

工人池动态管理工人

工人池可以动态增加和删除工人,算法如下:

  • 工人有 3 种状态:空闲(idle)、运行(running)和挂起(suspend)。空闲是指没有执行工作,运行是指正在执行工作,挂起是指在执行工作的过程中睡眠。
  • 如果工人池中有工作需要处理,至少保持一个处在运行状态的工人来处理。
  • 如果处在运行状态的工人在执行工作的过程中进入挂起状态,为了保证其他工作的执行,需要唤醒空闲的工人处理工作。
  • 如果有工作需要执行,并且处在运行状态的工人数量大于 1,会让多余的工人进入空闲状态。
  • 如果没有工作需要执行,会让所有工人进入空闲状态。
  • 如果创建的工人过多,工人池把空闲时间超过 300 秒(IDLE_WORKER_TIMEOUT)的工人删除。

工人池的调度思想是如果有工作需要处理,保持一个处在运行状态的工人来处理,不多也不少。 这种做法有个问题:如果工作是处理器密集型的,虽然工人没有进入挂起状态,但是会长时间占用处理器,让后续的工作阻塞太长时间。 为了解决这个问题,可以在创建工作队列的时候设置标志位 WQ_CPUINTENSIVE,声明工作队列是处理器密集的。当一个工人执行工作的时候,让这个工人不受工人池动态调度,像是进入了挂起状态,工人池创建新的工人来执行后续的工作。

参考文献

https://zhuanlan.zhihu.com/p/91106844

https://zhuanlan.zhihu.com/p/94561631

https://www.cnblogs.com/arnoldlu/p/8659988.html

《Linux 内核深度解析》