07_线程控制与同步(pthread)
虚假唤醒:线程从返回,但条件并未真正满足。操作系统信号导致wait提前返回多核 CPU 的竞态时序必须用while重新检查条件// 正确:while 重新检查// 错误:if 只检查一次,虚假唤醒时可能条件仍不满足核心原则:永远在while循环中调用。
在多线程编程中,线程是操作系统调度的最小单位,而进程是资源分配的最小单位。POSIX 线程(pthread)是 Linux 平台上事实上的线程标准库,掌握其 API 和同步机制是嵌入式 Linux 后端开发面试的核心考点。本文从线程概念入手,深入 pthread 基本 API、互斥锁、读写锁、条件变量、线程池手撕,并总结高频面试题。
目录
手撕:简化线程池(mutex + cond + queue + threads)
一、线程概念
线程是轻量级进程(LWP, Light-Weight Process),在 Linux 中本质上是「共享地址空间的进程」,通过 clone() 系统调用创建,共享资源的同时拥有私有数据。
同一进程的线程共享:
- 地址空间(代码段、数据段、堆)
- 文件描述符表
- 信号处理函数
- 当前工作目录
每个线程独有:
- 栈(包括内核栈和用户栈)
- 寄存器(包括 PC 指针)
errno变量- 线程 ID(TID)
- 信号掩码(signal mask)
- 调度优先级
线程 vs 进程对比
| 维度 | 线程 | 进程 |
|---|---|---|
| 创建开销 | 小(clone 共享地址空间) | 大(fork 复制页表) |
| 上下文切换 | 快(同进程内切换) | 慢(切换地址空间 + TLB 刷新) |
| 资源共享 | 共享地址空间、fd 等 | 独立地址空间,需 IPC |
| 通信方式 | 直接读写全局变量 | 管道、消息队列、共享内存等 |
| 安全性 | 一个线程崩溃可能导致整个进程退出 | 进程间隔离性强 |
| 资源回收 | 需要 join 或 detach,否则泄漏 | 子进程结束后由 init 回收 |
二、pthread 基本 API
核心函数
#include <pthread.h>
// 创建线程
int pthread_create(pthread_t *thread, const pthread_attr_t *attr,
void *(*start_routine)(void *), void *arg);
// 参数:线程 ID 输出、属性、线程函数、参数
// 返回:0 成功,错误码失败
// 等待线程(阻塞,回收资源)
int pthread_join(pthread_t thread, void **retval);
// 分离线程(自动回收,不可再 join)
int pthread_detach(pthread_t thread);
// 退出当前线程
void pthread_exit(void *retval);
// 获取当前线程 ID
pthread_t pthread_self(void);
代码示例
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
void *worker(void *arg) {
int id = *(int *)arg;
printf("Thread %d: hello\n", id);
return (void *)(long)(id * 2);
}
int main() {
pthread_t tid;
int val = 42;
pthread_create(&tid, NULL, worker, &val);
void *ret;
pthread_join(tid, &ret);
printf("Main thread: ret = %ld\n", (long)ret);
return 0;
}
编译链接加 -lpthread:
gcc demo.c -o demo -lpthread
三、线程同步——互斥锁(mutex)
为什么需要同步?
多个线程同时访问共享资源时,读写操作交错执行,导致结果不可预测,产生竞态条件(race condition)。需要保护的代码区域称为临界区(critical section)。
API
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; // 静态初始化
int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr);
int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);
int pthread_mutex_destroy(pthread_mutex_t *mutex);
底层实现原理——futex
Linux mutex 基于 futex(Fast Userspace muTeX) 实现,核心思想:
用户态 CAS 尝试加锁
├── 成功 → 直接返回(无需系统调用,极快)
└── 失败 → futex 系统调用进入内核睡眠(避免忙等)
- fast path:无竞争时,仅在用户态用原子指令(CAS)完成加锁/解锁,不陷入内核
- slow path:有竞争时,通过
futex(FUTEX_WAIT)让线程睡眠,futex(FUTEX_WAKE)唤醒等待线程
代码示例
#include <pthread.h>
#include <stdio.h>
#define LOOP 1000000
int counter = 0;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
void *increment(void *arg) {
for (int i = 0; i < LOOP; i++) {
pthread_mutex_lock(&mutex);
counter++;
pthread_mutex_unlock(&mutex);
}
return NULL;
}
int main() {
pthread_t t1, t2;
pthread_create(&t1, NULL, increment, NULL);
pthread_create(&t2, NULL, increment, NULL);
pthread_join(t1, NULL);
pthread_join(t2, NULL);
printf("counter = %d (expected %d)\n", counter, 2 * LOOP);
// 不加锁时结果 < 2*LOOP,加锁后等于 2*LOOP
pthread_mutex_destroy(&mutex);
return 0;
}
四、读写锁(rwlock)
适用场景
读多写少的场景(如配置文件读取、缓存查找),读操作之间不互斥,读写和写写之间互斥。
API
pthread_rwlock_t rwlock = PTHREAD_RWLOCK_INITIALIZER;
int pthread_rwlock_init(pthread_rwlock_t *rwlock, const pthread_rwlockattr_t *attr);
int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock); // 读锁——可多个线程同时持有
int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock); // 写锁——独占
int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);
策略对比
| 策略 | 行为 | 优缺点 |
|---|---|---|
| 读者优先 | 只要有读者在读,写者就一直等待 | 写者可能饿死 |
| 写者优先 | 有写者等待时,新读者被阻塞 | 读者延迟增加,但写者不饿死 |
Linux pthread_rwlock 默认实现倾向于写者优先。
五、条件变量(重点)
为什么需要条件变量?
如果线程需要等待某个条件满足(如队列非空),轮询检查(忙等,busy waiting)浪费 CPU。条件变量让线程在条件不满足时睡眠等待,条件满足时被唤醒。
API
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
int pthread_cond_signal(pthread_cond_t *cond); // 唤醒一个等待线程
int pthread_cond_broadcast(pthread_cond_t *cond); // 唤醒所有等待线程
核心使用模式(必须配合 mutex)
pthread_mutex_lock(&mutex);
while (!condition) ← 必须用 while,不能用 if!
pthread_cond_wait(&cond, &mutex); ← wait 自动释放 mutex 并阻塞
// 条件满足,处理业务
pthread_mutex_unlock(&mutex);
为什么必须用 while 而不是 if?
虚假唤醒(spurious wakeup):pthread_cond_wait 可能在没有收到 signal 的情况下返回。用 while 循环重新检查条件,确保条件真正满足。
signal vs broadcast
| 函数 | 唤醒数量 | 适用场景 |
|---|---|---|
pthread_cond_signal |
唤醒 1 个等待线程 | 所有等待线程等价,只需一个处理(如任务到来) |
pthread_cond_broadcast |
唤醒 所有 等待线程 | 条件变化可能对多个线程相关(如资源释放) |
代码示例:生产者-消费者模型
#include <pthread.h>
#include <stdio.h>
#include <unistd.h>
#define BUFFER_SIZE 5
int buffer[BUFFER_SIZE];
int count = 0, in = 0, out = 0;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t not_full = PTHREAD_COND_INITIALIZER;
pthread_cond_t not_empty = PTHREAD_COND_INITIALIZER;
void *producer(void *arg) {
for (int i = 0; i < 20; i++) {
pthread_mutex_lock(&mutex);
while (count == BUFFER_SIZE) // 缓冲区满,等待
pthread_cond_wait(¬_full, &mutex);
buffer[in] = i;
in = (in + 1) % BUFFER_SIZE;
count++;
printf("produced: %d\n", i);
pthread_cond_signal(¬_empty); // 通知消费者
pthread_mutex_unlock(&mutex);
}
return NULL;
}
void *consumer(void *arg) {
for (int i = 0; i < 20; i++) {
pthread_mutex_lock(&mutex);
while (count == 0) // 缓冲区空,等待
pthread_cond_wait(¬_empty, &mutex);
int val = buffer[out];
out = (out + 1) % BUFFER_SIZE;
count--;
printf("consumed: %d\n", val);
pthread_cond_signal(¬_full); // 通知生产者
pthread_mutex_unlock(&mutex);
}
return NULL;
}
int main() {
pthread_t p, c;
pthread_create(&p, NULL, producer, NULL);
pthread_create(&c, NULL, consumer, NULL);
pthread_join(p, NULL);
pthread_join(c, NULL);
return 0;
}
六、线程池(手撕重点)
线程池的组成
┌──────────────────────────────────┐
│ 线程池 │
│ ┌────────────────────┐ │
│ │ 任务队列 │ │
│ │ [task1][task2]... │ │
│ └───────┬────────────┘ │
│ │ 取任务 │
│ ┌───────┴─────────┐ │
│ │ 工作线程们 │ │
│ │ T1 T2 T3 T4 │ │
│ └─────────────────┘ │
│ 同步机制:mutex + condition │
└──────────────────────────────────┘
线程池的优点
- 减少创建/销毁开销:线程复用,避免频繁
pthread_create/pthread_join - 控制并发数:防止过多线程导致系统资源耗尽
- 任务与执行分离:解耦提交逻辑和执行逻辑
简化版线程池实现
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
typedef struct Task {
void (*func)(void *arg);
void *arg;
struct Task *next;
} Task;
typedef struct ThreadPool {
Task *head; // 任务队列头
Task *tail; // 任务队列尾
pthread_t *threads; // 工作线程数组
int thread_count;
int shutdown; // 关闭标志
pthread_mutex_t mutex;
pthread_cond_t cond;
} ThreadPool;
// 工作线程函数
void *worker(void *arg) {
ThreadPool *pool = (ThreadPool *)arg;
while (1) {
pthread_mutex_lock(&pool->mutex);
// 等待任务到来或线程池关闭
while (pool->head == NULL && !pool->shutdown)
pthread_cond_wait(&pool->cond, &pool->mutex);
if (pool->shutdown && pool->head == NULL) {
pthread_mutex_unlock(&pool->mutex);
pthread_exit(NULL);
}
// 取任务
Task *task = pool->head;
pool->head = task->next;
if (pool->tail == task)
pool->tail = NULL;
pthread_mutex_unlock(&pool->mutex);
// 执行任务(在锁外执行,避免长时间持有锁)
task->func(task->arg);
free(task);
}
return NULL;
}
// 初始化线程池
ThreadPool *thread_pool_create(int thread_count) {
ThreadPool *pool = (ThreadPool *)malloc(sizeof(ThreadPool));
pool->head = pool->tail = NULL;
pool->thread_count = thread_count;
pool->shutdown = 0;
pthread_mutex_init(&pool->mutex, NULL);
pthread_cond_init(&pool->cond, NULL);
pool->threads = (pthread_t *)malloc(sizeof(pthread_t) * thread_count);
for (int i = 0; i < thread_count; i++)
pthread_create(&pool->threads[i], NULL, worker, pool);
return pool;
}
// 提交任务
void thread_pool_submit(ThreadPool *pool, void (*func)(void *), void *arg) {
Task *task = (Task *)malloc(sizeof(Task));
task->func = func;
task->arg = arg;
task->next = NULL;
pthread_mutex_lock(&pool->mutex);
if (pool->tail == NULL) {
pool->head = pool->tail = task;
} else {
pool->tail->next = task;
pool->tail = task;
}
pthread_cond_signal(&pool->cond); // 唤醒一个工作线程
pthread_mutex_unlock(&pool->mutex);
}
// 销毁线程池
void thread_pool_destroy(ThreadPool *pool) {
pthread_mutex_lock(&pool->mutex);
pool->shutdown = 1;
pthread_mutex_unlock(&pool->mutex);
pthread_cond_broadcast(&pool->cond); // 唤醒所有线程
for (int i = 0; i < pool->thread_count; i++)
pthread_join(pool->threads[i], NULL);
pthread_mutex_destroy(&pool->mutex);
pthread_cond_destroy(&pool->cond);
free(pool->threads);
free(pool);
}
// 测试用例
void print_num(void *arg) {
int n = *(int *)arg;
printf("task: %d\n", n);
}
int main() {
ThreadPool *pool = thread_pool_create(4);
int nums[10];
for (int i = 0; i < 10; i++) {
nums[i] = i;
thread_pool_submit(pool, print_num, &nums[i]);
}
sleep(1); // 等任务执行完
thread_pool_destroy(pool);
return 0;
}
七、线程安全
可重入函数 vs 线程安全
| 概念 | 定义 | 核心要求 |
|---|---|---|
| 可重入函数 | 函数在被中断后再次调用也能正确执行 | 不访问全局/静态变量,只使用局部变量和参数 |
| 线程安全函数 | 多个线程同时调用也能正确工作 | 通过锁保护共享资源 |
- 可重入函数一定是线程安全的,但反之不成立
- 典型不可重入:
strtok()(内部静态指针维护状态)→ 线程安全版本strtok_r()
线程安全常见实现方式
- 互斥锁:保护共享数据
- 原子操作:
__sync_fetch_and_add系列,C11_Atomic - 读写锁:读多写少场景
- RCU(Read-Copy-Update):内核级无锁读方案
Thread Local Storage(TLS)
每个线程拥有独立的变量副本,互不干扰。
// GCC 扩展
__thread int tls_var = 0;
// C11 标准
_Thread_local int tls_var = 0;
// C++11
thread_local int tls_var = 0;
典型应用:errno 在 glibc 中就是用 TLS 实现的,每个线程有独立的 errno。
八、面试高频问题
Q: 进程和线程的区别?
从以下四个维度答:
| 维度 | 进程 | 线程 |
|---|---|---|
| 创建开销 | fork 复制页表、文件描述符等,开销大 |
clone 共享地址空间,只需分配栈,开销小 |
| 资源共享 | 独立地址空间,通过 IPC 通信 | 共享全局变量、堆、fd,通信直接 |
| 上下文切换 | 切换地址空间,TLB 刷新,慢 | 同进程内切换不涉及地址空间切换,快 |
| 安全性 | 进程崩溃不影响其他进程 | 一个线程 crash(如 SIGSEGV)整个进程退出 |
Q: mutex 是如何实现的?
基于 futex(Fast Userspace muTeX) 实现,两层结构:
线程 A 加锁成功 线程 B 加锁失败
用户态 CAS 原子操作 CAS 失败
│ │
▼ ▼
进入临界区 futex(FUTEX_WAIT)
进入内核睡眠
线程 A 解锁
│
▼
futex(FUTEX_WAKE) → 唤醒线程 B
- 无竞争:纯用户态原子操作,0 次系统调用
- 有竞争:一次
futex系统调用进入内核睡眠/唤醒 - 相比于纯内核锁(如 SysV 信号量),futex 在无竞争时性能极好
Q: 条件变量为什么必须配合 mutex?
避免 wait 条件判断 和 线程挂起 之间的 竞态条件:
// 错误做法(无 mutex)
if (queue.empty()) ──→ 判断时队列为空
cond_wait(&cond); ──→ 此时另一个线程刚放入数据并 signal
signal 丢失,当前线程永远睡眠!
pthread_cond_wait(&cond, &mutex) 的原子语义:释放 mutex + 进入睡眠 是原子操作,不会丢失信号。
Q: 什么是虚假唤醒?为什么必须用 while 判断条件?
虚假唤醒:线程从 pthread_cond_wait 返回,但条件并未真正满足。可能原因:
- 操作系统信号导致
wait提前返回 - 多核 CPU 的竞态时序
必须用 while 重新检查条件:
// 正确:while 重新检查
while (count == 0)
pthread_cond_wait(&cond, &mutex);
// 错误:if 只检查一次,虚假唤醒时可能条件仍不满足
if (count == 0) // BUG!
pthread_cond_wait(&cond, &mutex);
核心原则:永远在 while 循环中调用 pthread_cond_wait。
Q: 线程池的参数如何确定?
CPU 密集型任务:线程数 ≈ CPU 核心数 + 1
→ 避免过多上下文切换
→ +1 是为了防止缺页中断导致 CPU 空闲
IO 密集型任务:线程数 ≈ CPU 核心数 × 2
→ IO 等待时 CPU 空闲,可让更多线程执行
→ 实际可更高(取决于 IO 等待时间占比)
通用公式:线程数 = 核心数 × (1 + 等待时间/计算时间)
手撕:生产者消费者(条件变量版本)
见五、条件变量中的代码示例。
手撕:简化线程池(mutex + cond + queue + threads)
见六、线程池中的代码示例。
openEuler 是由开放原子开源基金会孵化的全场景开源操作系统项目,面向数字基础设施四大核心场景(服务器、云计算、边缘计算、嵌入式),全面支持 ARM、x86、RISC-V、loongArch、PowerPC、SW-64 等多样性计算架构
更多推荐


所有评论(0)