⭐️在这个怀疑的年代,我们依然需要信仰。

个人主页 :YYYing.

⭐️C++大型项目系列专栏:C++大型项目之高性能服务器框架

系列上期内容:【C++项目之高性能服务器框架 (一) 】日志系统&配置系统

系列下期内容:【C++项目之高性能服务器框架 (三) 】协程调度器&定时器&IO协程管理器篇 (上)


目录

前言:

一、线程与协程模块介绍

二、整体架构图

三、基础概念:进程、线程、协程

3.1 三者对比

3.2 Linux 用户态上下文切换:ucontext

四、Thread 类详解

4.1 定义(thread.h)

4.2 构造函数(thread.cc)

4.3 析构函数(thread.cc)

4.4 run() —— 线程入口(thread.cc)

4.5 join()(thread.cc)

4.6 静态方法

五、Fiber 类详解

5.1 定义(fiber.h)

5.2 协程状态机

5.3 两个核心构造函数

5.3.1 主协程构造 —— Fiber()(fiber.cc)

5.3.2 子协程构造 —— Fiber(cb, stacksize, use_caller)(fiber.cc)

5.4 析构函数(fiber.cc)

5.5 协程切换:swapIn / swapOut / call / back

5.5.1 swapIn() —— 切换到子协程执行(fiber.cc)

5.5.2 swapOut() —— 子协程让出(fiber.cc)

5.5.3 call() —— 从线程主协程切换到子协程(fiber.cc)

5.5.4 back() —— 子协程回到线程主协程(fiber.cc)

5.6 执行入口:MainFunc 与 CallerMainFunc

5.6.1 MainFunc()(fiber.cc)

5.6.2 CallerMainFunc()(fiber.cc)

5.7 静态方法

5.7.1 GetThis() —— 获取当前协程(fiber.cc)

5.7.2 YieldToReady() / YieldToHold()(fiber.cc)

六、线程内的协程结构

七、Mutex 与同步原语详解

7.1 整体概览

7.2 Semaphore(mutex.h)

7.3 Mutex 与 RAII 局部锁(mutex.h)

7.4 Spinlock(mutex.h)

7.5 CASLock(mutex.h)

八、线程ID与协程ID

8.1 GetThreadId()(util.cc)

8.2 GetFiberId()(util.cc / fiber.cc)

九、线程安全总结

十、完整调用链梳理

步骤 1:创建协程

步骤 2:调度器选择该协程,调用 swapIn()

步骤 3:进入 MainFunc()

步骤 4:YieldToReady() 让出

步骤 5:调度器再次调度

步骤 6:用户lambda继续执行

步骤 7:MainFunc() 收尾

十一、学习验证清单

结语

---⭐️封面自取⭐️---



前言:

        本项目是基于小电视里sylar大佬的项目来做的一个项目总结,其多为一些项目思考与笔记,可能还会有一些图解之类的讲解,但光看本专栏学习此项目肯定是不足的,多去跟着视频敲敲代码或者自己下去实现实现各个模块。由于小生经验不足,这个系列专栏制作周期可能会稍微有点长,甚至有可能会出现断更的情况,但我尽量往完写,望各位大佬多多包涵。

        那么这一次我们就开始我们线程与协程的模块了,Let`s go.

一、线程与协程模块介绍

       我们高并发服务器都有的核心矛盾是,一个网络服务器要同时处理成千上万个连接,传统方案面临两难:

方案 优点 缺点
一个连接一个线程 代码直观,阻塞读写很自然 线程数爆炸,上下文切换开销巨大(Linux 线程栈默认 8MB,1万个连接 = 80GB 虚拟内存)
单线程 + epoll + 回调 资源占用低,事件驱动高效 回调地狱(Callback Hell),业务逻辑被拆成碎片,代码难以维护

那么我们的协程就是第三种选择:

  • 写代码像多线程阻塞模型一样直观(同步写法)。

  • 底层执行像事件驱动模型一样高效(单线程内异步调度)。

  • 资源占用极低(sylar 默认协程栈 128KB,1万个协程 ≈ 1.2GB,且可动态调整)。

我们先来看看此日志管理器的整体架构设计:

二、整体架构图


三、基础概念:进程、线程、协程

3.1 三者对比

维度 进程(Process) 线程(Thread) 协程(Coroutine/Fiber)
定义 资源分配的基本单位 CPU调度的基本单位 用户态的轻量级执行流
地址空间 独立 共享所属进程的空间 共享所属线程的空间
切换开销 最大(需切换页表、TLB刷新) 中等(需切换寄存器、栈) 最小(纯用户态,只换上下文)
切换权限 内核态 内核态 用户态
并发性 多核并行 多核并行 单线程内并发,本质串行
数据共享 IPC(管道、共享内存等) 直接读写共享内存 直接读写共享内存
栈空间 独立 独立(通常1~8MB) 独立(可自定义,如128KB)
创建销毁开销 极小

一句话总结:

  • 进程是“资源的围墙”,隔离性最强;

  • 线程是“CPU的执行单位”,多线程可同时跑在多核上;

  • 协程是“用户自己管理的执行流”,切换不经过内核,代价极低。


3.2 Linux 用户态上下文切换:ucontext

sylar 的协程基于 POSIX 的 <ucontext.h>,核心是三个函数:

函数 作用
getcontext(ucontext_t *ucp) 将当前CPU上下文保存到 ucp
setcontext(const ucontext_t *ucp) 跳转到 ucp 中保存的上下文执行(不返回)
swapcontext(ucontext_t *oucp, const ucontext_t *ucp) 先保存当前上下文到 oucp,再跳转到 ucp
makecontext(ucontext_t *ucp, void (*func)(), argc, ...) 修改 ucp,使其在激活时执行 func

关键理解:

  • ucontext_t 内部保存了:通用寄存器、信号掩码、栈指针(rsp)、指令指针(rip)等。

  • makecontext 必须配合 uc_stack 使用,即事先指定好该上下文使用的栈空间。

  • swapcontext 是协程切换的灵魂:A协程调用 swapcontext(&A_ctx, &B_ctx),CPU立刻去执行B,将来某个时刻B再 swapcontext(&B_ctx, &A_ctx) 回来。


四、Thread 类详解

4.1 定义(thread.h)

class Thread : Noncopyable {
public:
    /// 线程智能指针类型
    typedef std::shared_ptr<Thread> ptr;

    // 构造函数,线程执行函数,线程名称
    Thread(std::function<void()> cb, const std::string& name);

    ~Thread();

    // 线程ID
    pid_t getId() const { return m_id;}
    // 线程名称
    const std::string& getName() const { return m_name;}
    // 等待线程执行完成
    void join();
    // 获取当前的线程指针
    static Thread* GetThis();
    // 取当前的线程名称
    static const std::string& GetName();
    // 设置当前线程名称,线程名称
    static void SetName(const std::string& name);
private:

    // 线程执行函数
    static void* run(void* arg);
private:
    /// 线程id
    pid_t m_id = -1;
    /// 线程结构
    pthread_t m_thread = 0;
    /// 线程执行函数
    std::function<void()> m_cb;
    /// 线程名称
    std::string m_name;
    /// 信号量
    Semaphore m_semaphore;
};

}

逐成员拆解:

成员 类型 说明
m_id pid_t 线程的真实OS ID(通过 syscall(SYS_gettid) 获取),不是 pthread_t
m_thread pthread_t POSIX 线程句柄,用于 pthread_join / pthread_detach
m_cb std::function<void()> 线程要执行的用户函数
m_name std::string 线程名称,用于日志和调试
m_semaphore Semaphore 信号量,保证构造函数返回时线程已启动

设计要点:

  • 继承 Noncopyable:线程句柄不可拷贝,避免重复释放或重复 join。

  • m_semaphore 的作用:构造函数中调用 pthread_create 后,立即 wait();子线程在 run() 中初始化完毕后再 notify()。这确保用户拿到 Thread 对象时,线程已经在运行。


4.2 构造函数(thread.cc)

Thread::Thread(std::function<void()> cb, const std::string& name)
    :m_cb(cb)
    ,m_name(name) {
    if(name.empty()) {
        m_name = "UNKNOW";
    }
    int rt = pthread_create(&m_thread, nullptr, &Thread::run, this);
    if(rt) {
        SYLAR_LOG_ERROR(g_logger) << "pthread_create thread fail, rt=" << rt
            << " name=" << name;
        throw std::logic_error("pthread_create error");
    }
    m_semaphore.wait();
}

逐行解析:

  1. m_cb(cb), m_name(name):初始化列表保存用户回调和名称。

  2. if(name.empty()):默认名称 "UNKNOW"

  3. pthread_create(...):创建内核线程,入口是静态函数 Thread::run,参数传 this

  4. m_semaphore.wait()阻塞等待,直到子线程在 run() 中调用 notify()。这解决了“对象已构造但线程还没初始化完”的竞态问题。


4.3 析构函数(thread.cc)

Thread::~Thread() {
    if(m_thread) {
        pthread_detach(m_thread);
    }
}
  • 如果线程句柄还在(未 join),析构时自动 detach,避免资源泄漏。

  • 注意:如果用户已经 join()m_thread 会被置 0(见 4.5),此处不会重复 detach。


4.4 run() —— 线程入口(thread.cc)

void* Thread::run(void* arg) {
    Thread* thread = (Thread*)arg;
    t_thread = thread;
    t_thread_name = thread->m_name;
    thread->m_id = sylar::GetThreadId();
    pthread_setname_np(pthread_self(), thread->m_name.substr(0, 15).c_str());
​
    std::function<void()> cb;
    cb.swap(thread->m_cb);
​
    thread->m_semaphore.notify();
​
    cb();
    return 0;
}

关键步骤拆解:

步骤 代码 说明
1 t_thread = thread 设置线程局部变量,记录当前线程对象指针
2 t_thread_name = ... 设置线程局部变量,记录当前线程名称
3 thread->m_id = GetThreadId() 获取真实OS线程ID
4 pthread_setname_np(...) 将线程名称设到内核(最多15字符,截断)
5 cb.swap(thread->m_cb) 将回调移出对象,避免持有 Thread 对象的引用
6 m_semaphore.notify() 通知构造函数:线程已初始化完毕
7 cb() 执行用户代码

thread_local 变量:

static thread_local Thread* t_thread = nullptr;
static thread_local std::string t_thread_name = "UNKNOW";
  • thread_local 是 C++11 关键字,每个线程有独立副本。

  • GetThis()GetName() 就是通过读取这些线程局部变量实现的。


4.5 join()(thread.cc)

void Thread::join() {
    if(m_thread) {
        int rt = pthread_join(m_thread, nullptr);
        if(rt) {
            SYLAR_LOG_ERROR(g_logger) << "pthread_join thread fail...";
            throw std::logic_error("pthread_join error");
        }
        m_thread = 0;
    }
}
  • pthread_join 阻塞等待线程结束。

  • 成功后 m_thread = 0,析构时不再 detach


4.6 静态方法

Thread* Thread::GetThis() {
    return t_thread;
}
​
const std::string& Thread::GetName() {
    return t_thread_name;
}
​
void Thread::SetName(const std::string& name) {
    if(name.empty()) return;
    if(t_thread) {
        t_thread->m_name = name;
    }
    t_thread_name = name;
}
  • 只有在线程内部调用 GetThis() 才返回非空。主线程中(未通过 Thread 类创建)返回 nullptr


五、Fiber 类详解

5.1 定义(fiber.h)

class Fiber : public std::enable_shared_from_this<Fiber> {
friend class Scheduler;
public:
    typedef std::shared_ptr<Fiber> ptr;

    // 协程状态
    enum State {
        /// 初始化状态
        INIT,
        /// 暂停状态
        HOLD,
        /// 执行中状态
        EXEC,
        /// 结束状态
        TERM,
        /// 可执行状态
        READY,
        /// 异常状态
        EXCEPT
    };
private:
    // 无参构造函数, 每个线程第一个协程的构造
    Fiber();

public:
    // 构造函数, 协程执行的函数, 协程栈大小, 是否在MainFiber上调度
    Fiber(std::function<void()> cb, size_t stacksize = 0, bool use_caller = false);

    ~Fiber();

    // 重置协程执行函数,并设置状态
    // getState() 为 INIT, TERM, EXCEPT
    // getState() = INIT
    void reset(std::function<void()> cb);

    // 将当前协程切换到运行状态
    // getState() != EXEC
    // getState() = EXEC
    void swapIn();

    // 将当前协程切换到后台
    void swapOut();

    // 将当前线程切换到执行状态, 执行的为当前线程的主协程
    void call();

    // 将当前线程切换到后台
    // 执行的为该协程
    // 返回到线程的主协程
    void back();

    // 返回协程id
    uint64_t getId() const { return m_id;}

    // 返回协程状态
    State getState() const { return m_state;}
public:

    // 设置当前线程的运行协程, f 运行协程
    static void SetThis(Fiber* f);

    // 返回当前所在的协程
    static Fiber::ptr GetThis();

    // 将当前协程切换到后台,并设置为READY状态
    // getState() = READY
    static void YieldToReady();

    // 将当前协程切换到后台,并设置为HOLD状态
    // getState() = HOLD
    static void YieldToHold();

    // 返回当前协程的总数量
    static uint64_t TotalFibers();

    // 协程执行函数, 执行完成返回到线程主协程
    static void MainFunc();

    // 协程执行函数, 执行完成返回到线程调度协程
    static void CallerMainFunc();

    // 获取当前协程的id
    static uint64_t GetFiberId();
private:
    /// 协程id
    uint64_t m_id = 0;
    /// 协程运行栈大小
    uint32_t m_stacksize = 0;
    /// 协程状态
    State m_state = INIT;
    /// 协程上下文
    ucontext_t m_ctx;
    /// 协程运行栈指针
    void* m_stack = nullptr;
    /// 协程运行函数
    std::function<void()> m_cb;
};

}

设计要点:

  • friend class Scheduler:调度器需要直接访问协程私有成员(如 m_ctx)。

  • enable_shared_from_thisGetThis() 中需要返回 shared_from_this()

  • 无参构造函数 Fiber()private:只有类内部能创建“主协程”。

5.2 协程状态机

状态 含义 转移条件
INIT 刚创建或 reset() swapIn() / call()EXEC
EXEC 正在运行 swapOut()READY/HOLD;执行完 → TERM
READY 可执行(被 YieldToReady 让出) 调度器再次选中 → EXEC
HOLD 暂停(被 YieldToHold 让出) 调度器再次选中 → EXEC
TERM 正常结束 不可再转移
EXCEPT 异常结束 不可再转移

5.3 两个核心构造函数

5.3.1 主协程构造 —— Fiber()(fiber.cc)

Fiber::Fiber() {
    m_state = EXEC;
    SetThis(this);
​
    if(getcontext(&m_ctx)) {
        SYLAR_ASSERT2(false, "getcontext");
    }
​
    ++s_fiber_count;
    SYLAR_LOG_DEBUG(g_logger) << "Fiber::Fiber main";
}

关键:

  • m_state = EXEC:主协程天然就在执行中。

  • SetThis(this):设置线程局部变量 t_fiber

  • getcontext(&m_ctx):保存当前CPU上下文(即线程原生执行流的上下文)。

  • 没有分配栈m_stack = nullptr):主协程使用的是线程本身的栈。

  • 这是 private 的,只能由 GetThis() 在首次调用时自动创建。

5.3.2 子协程构造 —— Fiber(cb, stacksize, use_caller)(fiber.cc)

Fiber::Fiber(std::function<void()> cb, size_t stacksize, bool use_caller)
    :m_id(++s_fiber_id)
    ,m_cb(cb) {
    ++s_fiber_count;
    m_stacksize = stacksize ? stacksize : g_fiber_stack_size->getValue();
​
    m_stack = StackAllocator::Alloc(m_stacksize);
    if(getcontext(&m_ctx)) {
        SYLAR_ASSERT2(false, "getcontext");
    }
    m_ctx.uc_link = nullptr;
    m_ctx.uc_stack.ss_sp = m_stack;
    m_ctx.uc_stack.ss_size = m_stacksize;
​
    if(!use_caller) {
        makecontext(&m_ctx, &Fiber::MainFunc, 0);
    } else {
        makecontext(&m_ctx, &Fiber::CallerMainFunc, 0);
    }
​
    SYLAR_LOG_DEBUG(g_logger) << "Fiber::Fiber id=" << m_id;
}

逐行解析:

代码 说明
m_id(++s_fiber_id) 原子自增,全局唯一协程ID
m_stacksize = ... 用户指定或读取配置(默认128KB)
StackAllocator::Alloc(...) malloc 分配协程私有栈
getcontext(&m_ctx) 获取当前上下文作为模板
uc_link = nullptr 协程结束后不自动跳转到其他上下文(由 MainFunc 手动控制)
uc_stack.ss_sp = m_stack 绑定栈空间,这是协程能独立运行的核心
uc_stack.ss_size = ... 栈大小
makecontext(..., MainFunc, 0) 修改上下文入口为 MainFunc,参数个数为0

use_caller 的作用:

  • false(默认):协程执行完后通过 swapOut() 回到调度器主协程Scheduler::GetMainFiber())。

  • true:协程执行完后通过 back() 回到线程主协程t_threadFiber)。调度器在 use_caller=true 时使用。


5.4 析构函数(fiber.cc)

Fiber::~Fiber() {
    --s_fiber_count;
    if(m_stack) {
        SYLAR_ASSERT(m_state == TERM || m_state == EXCEPT || m_state == INIT);
        StackAllocator::Dealloc(m_stack, m_stacksize);
    } else {
        SYLAR_ASSERT(!m_cb);
        SYLAR_ASSERT(m_state == EXEC);
        Fiber* cur = t_fiber;
        if(cur == this) {
            SetThis(nullptr);
        }
    }
    SYLAR_LOG_DEBUG(g_logger) << "Fiber::~Fiber id=" << m_id;
}

分支逻辑:

  • if(m_stack)子协程。必须处于结束状态(TERM/EXCEPT)或未执行(INIT),才能释放栈。正在执行的协程不能析构。

  • else主协程。主协程没有独立栈,析构时如果它是当前协程,需要清空 t_fiber


5.5 协程切换:swapIn / swapOut / call / back

这是协程最核心的四个方法,本质都是 swapcontext 的封装。

5.5.1 swapIn() —— 切换到子协程执行(fiber.cc)

void Fiber::swapIn() {
    SetThis(this);
    SYLAR_ASSERT(m_state != EXEC);
    m_state = EXEC;
    if(swapcontext(&Scheduler::GetMainFiber()->m_ctx, &m_ctx)) {
        SYLAR_ASSERT2(false, "swapcontext");
    }
}
  • 调度器主协程切换到当前子协程。

  • 保存调度器上下文,恢复子协程上下文。

  • 使用场景:调度器选择一个任务,调用其 swapIn()

5.5.2 swapOut() —— 子协程让出(fiber.cc)

void Fiber::swapOut() {
    SetThis(Scheduler::GetMainFiber());
    if(swapcontext(&m_ctx, &Scheduler::GetMainFiber()->m_ctx)) {
        SYLAR_ASSERT2(false, "swapcontext");
    }
}
  • 从当前子协程切换回调度器主协程

  • 注意swapOut() 不修改自身状态(m_state 保持 EXEC),由调用者(如 YieldToReady)负责修改。

5.5.3 call() —— 从线程主协程切换到子协程(fiber.cc)

void Fiber::call() {
    SetThis(this);
    m_state = EXEC;
    if(swapcontext(&t_threadFiber->m_ctx, &m_ctx)) {
        SYLAR_ASSERT2(false, "swapcontext");
    }
}
  • 线程主协程t_threadFiber)切换到子协程。

  • 使用场景use_caller=true 的调度器模式,或者非调度器场景下直接运行协程。

5.5.4 back() —— 子协程回到线程主协程(fiber.cc)

void Fiber::back() {
    SetThis(t_threadFiber.get());
    if(swapcontext(&m_ctx, &t_threadFiber->m_ctx)) {
        SYLAR_ASSERT2(false, "swapcontext");
    }
}
  • call() 配对,从子协程回到线程主协程。


5.6 执行入口:MainFunc 与 CallerMainFunc

5.6.1 MainFunc()(fiber.cc)

void Fiber::MainFunc() {
    Fiber::ptr cur = GetThis();
    SYLAR_ASSERT(cur);
    try {
        cur->m_cb();
        cur->m_cb = nullptr;
        cur->m_state = TERM;
    } catch (std::exception& ex) {
        cur->m_state = EXCEPT;
        SYLAR_LOG_ERROR(g_logger) << "Fiber Except: " << ex.what()
            << " fiber_id=" << cur->getId();
    } catch (...) {
        cur->m_state = EXCEPT;
        SYLAR_LOG_ERROR(g_logger) << "Fiber Except"
            << " fiber_id=" << cur->getId();
    }
​
    auto raw_ptr = cur.get();
    cur.reset();           // 释放 shared_ptr,引用计数-1
    raw_ptr->swapOut();    // 切换回调度器主协程
​
    SYLAR_ASSERT2(false, "never reach");
}

关键设计:

步骤 说明
cur->m_cb() 执行用户回调
cur->m_cb = nullptr 释放函数对象,避免持有资源
cur->m_state = TERM 标记正常结束
cur.reset() 必须GetThis() 返回的 shared_ptr 会增加引用计数,如果不释放,协程对象永远无法析构。
raw_ptr->swapOut() 协程结束,切回调度器。由于引用已释放,调度器里的 shared_ptr 如果也是最后一个引用,协程会在切走后被销毁。
SYLAR_ASSERT2(false, ...) 理论上永远不会执行到,因为 swapOut 不会返回。

5.6.2 CallerMainFunc()(fiber.cc)

MainFunc() 几乎相同,唯一的区别是最后调用 raw_ptr->back() 而不是 swapOut()

  • back() 回到线程主协程

  • 用于 use_caller=true 的场景(调度器线程本身也参与执行任务)。


5.7 静态方法

5.7.1 GetThis() —— 获取当前协程(fiber.cc)

Fiber::ptr Fiber::GetThis() {
    if(t_fiber) {
        return t_fiber->shared_from_this();
    }
    Fiber::ptr main_fiber(new Fiber);
    SYLAR_ASSERT(t_fiber == main_fiber.get());
    t_threadFiber = main_fiber;
    return t_fiber->shared_from_this();
}

懒加载主协程:

  • 如果当前线程还没有创建主协程(t_fiber == nullptr),自动调用私有构造函数 Fiber() 创建一个。

  • t_threadFiber 持有主协程的 shared_ptr,防止其被销毁。

  • 这是线程安全的:每个线程各自调用,各自创建自己的主协程。

5.7.2 YieldToReady() / YieldToHold()(fiber.cc)

void Fiber::YieldToReady() {
    Fiber::ptr cur = GetThis();
    SYLAR_ASSERT(cur->m_state == EXEC);
    cur->m_state = READY;
    cur->swapOut();
}
​
void Fiber::YieldToHold() {
    Fiber::ptr cur = GetThis();
    SYLAR_ASSERT(cur->m_state == EXEC);
    cur->swapOut();  // 注意:这里不修改状态,保持原状态(通常是HOLD)
}
  • YieldToReady:显式让出,并标记为可立即再次调度。

  • YieldToHold:显式让出,状态保持 EXEC 或被外部设为 HOLD。通常用于等待 IO 事件完成。


六、线程内的协程结构

每个线程内部,sylar 维护了以下 thread_local 变量:

// fiber.cc
static thread_local Fiber* t_fiber = nullptr;          // 当前正在运行的协程
static thread_local Fiber::ptr t_threadFiber = nullptr; // 线程主协程(原生执行流)
​
// scheduler.cc
static thread_local Scheduler* t_scheduler = nullptr;   // 当前线程所属的调度器
static thread_local Fiber* t_scheduler_fiber = nullptr; // 调度器主协程

关系图:

两种调度模式:

模式 use_caller 特点
独立调度器线程 false 调度器运行在独立线程,所有工作线程通过 swapIn/swapOut 与调度器主协程交互
主线程即调度线程 true 创建 Scheduler 的线程自身也成为工作线程,使用 call/back 切换

七、Mutex 与同步原语详解

7.1 整体概览

类名 底层实现 特点 适用场景
Semaphore sem_t (POSIX无名信号量) 可跨线程等待/通知 线程启动同步、资源池
Mutex pthread_mutex_t 常规互斥锁 保护临界区
RWMutex pthread_rwlock_t 读共享、写独占 读多写少的场景
Spinlock pthread_spinlock_t 自旋等待,不挂起线程 极短临界区(如日志系统的锁)
CASLock std::atomic_flag 原子操作实现,最轻量 标志位保护、调试替代
NullMutex / NullRWMutex 空实现 无开销,用于调试或禁用锁 模板参数替换

7.2 Semaphore(mutex.h)

class Semaphore : Noncopyable {
public:
    Semaphore(uint32_t count = 0);
    ~Semaphore();
    void wait();   // P操作,count--,为0则阻塞
    void notify(); // V操作,count++,唤醒等待者
private:
    sem_t m_semaphore;
};
  • Thread 构造函数中:wait() 在父线程,notify() 在子线程,完成启动握手。


7.3 Mutex 与 RAII 局部锁(mutex.h)

class Mutex : Noncopyable {
public:
    typedef ScopedLockImpl<Mutex> Lock;
    void lock()   { pthread_mutex_lock(&m_mutex); }
    void unlock() { pthread_mutex_unlock(&m_mutex); }
private:
    pthread_mutex_t m_mutex;
};

ScopedLockImpl 模板(mutex.h):

template<class T>
struct ScopedLockImpl {
    ScopedLockImpl(T& mutex) :m_mutex(mutex) {
        m_mutex.lock();
        m_locked = true;
    }
    ~ScopedLockImpl() {
        unlock();
    }
    void lock() { ... }
    void unlock() { ... }
private:
    T& m_mutex;
    bool m_locked;
};

使用方式:

{
    MutexType::Lock lock(m_mutex);  // 构造时加锁
    // 临界区...
}  // 析构时自动解锁

设计要点:

  • 模板化:一套 ScopedLockImpl 适配 MutexSpinlockCASLock

  • m_locked 标记:支持中途 unlock() 后再次 lock(),避免重复解锁。


7.4 Spinlock(mutex.h)

class Spinlock : Noncopyable {
public:
    typedef ScopedLockImpl<Spinlock> Lock;
    void lock()   { pthread_spin_lock(&m_mutex); }
    void unlock() { pthread_spin_unlock(&m_mutex); }
private:
    pthread_spinlock_t m_mutex;
};
  • 自旋锁:一种基于忙等待的锁机制,它是一种轻量级的锁实现方式。 与传统的阻塞锁不同,自旋锁在获取锁时不会主动阻塞线程,而是通过循环不断地尝试获取锁,直到成功获取为止。用法和互斥锁几乎一样,只不过机制不同。

  • 适用于以下情况:

    • 锁的保持时间很短:如果临界区的代码执行时间很短,使用自旋锁可以避免线程切换的开销,从而提高性能。

    • 并发冲突较少:自旋锁适用于并发冲突较少的情况。如果临界区的竞争激烈,自旋锁可能会导致大量的线程空转,浪费CPU资源。

    • 不可阻塞:自旋锁要求获取锁的操作是非阻塞的,即不会引起线程的挂起或阻塞。

        如果获取锁的操作可能会引起线程的阻塞,使用自旋锁就不合适,应该选择其他类型的锁。

  • 如果临界区很长,自旋锁会浪费大量CPU。


7.5 CASLock(mutex.h)

class CASLock : Noncopyable {
public:
    typedef ScopedLockImpl<CASLock> Lock;
    void lock() {
        while(std::atomic_flag_test_and_set_explicit(
            &m_mutex, std::memory_order_acquire));
    }
    void unlock() {
        std::atomic_flag_clear_explicit(
            &m_mutex, std::memory_order_release);
    }
private:
    volatile std::atomic_flag m_mutex;
};
  • 什么是CAS自旋锁:

    • 通俗点说,当我们想修改一个值时,我们会先将这个值和原先的值进行比较,如果发现和原先的值一样,那么我们再进行修改。CAS机制当中使用了3个基本操作数:内存地址V,旧的预期值A,要修改的新值B。更新一个变量时,只有当预期值A和内存地址V中的实际值相同时,才会将内存地址对应的值修改为B。如果发现不一致,则会重新进行尝试,这个尝试的过程被称为自旋。

  • 纯C++11原子操作实现,不依赖 pthread。

  • test_and_set:原子地读取旧值并设为 true。如果旧值为 false,表示获取锁成功;否则循环等待。

  • memory_order_acquire/release:保证锁内操作不被重排到锁外(内存屏障)。


八、线程ID与协程ID

8.1 GetThreadId()(util.cc)

pid_t GetThreadId() {
    return syscall(SYS_gettid);
}
  • SYS_gettid:Linux 特有的系统调用,返回线程的真实 PID(在 top -H 中可见)。

  • 注意pthread_self() 返回的是 pthread 库内部标识,不是 OS 可见的线程ID。

  • sylar 日志中 %t 输出的就是 GetThreadId()


8.2 GetFiberId()(util.cc / fiber.cc)

uint32_t GetFiberId() {
    return sylar::Fiber::GetFiberId();
}
uint64_t Fiber::GetFiberId() {
    if(t_fiber) {
        return t_fiber->getId();
    }
    return 0;
}
  • 协程ID是纯用户态生成的原子递增序列(++s_fiber_id)。

  • 主线程中如果未创建协程,返回 0。


九、线程安全总结

保护对象 锁类型
Logger m_appenders, m_formatter Spinlock
LogAppender m_formatter Spinlock
LoggerManager m_loggers Spinlock
Thread 无(除 m_semaphore 内部) Semaphore(启动握手)
Fiber s_fiber_id, s_fiber_count std::atomic
Scheduler 任务队列、线程池 Mutex / Spinlock

关键设计:

  • Fiber 本身不加锁:一个协程同一时刻只会在一个线程上运行,不存在多线程竞争协程状态的问题。

  • 协程切换时,需要确保 t_fibert_threadFiberthread_local 变量被正确更新,否则 GetThis() 会返回错误结果。

  • std::function 和栈数据(m_cb, m_stack)不需要加锁,因为它们只被当前运行的线程访问。


十、完整调用链梳理

以调度器调度一个协程为例:

步骤 1:创建协程

Fiber::ptr fiber(new Fiber([](){
    std::cout << "Hello Fiber" << std::endl;
    Fiber::YieldToReady();
    std::cout << "Fiber resumed" << std::endl;
}));

步骤 2:调度器选择该协程,调用 swapIn()

fiber->swapIn();

内部执行:

SetThis(this);              // t_fiber = fiber
m_state = EXEC;
swapcontext(&scheduler_ctx, &fiber_ctx);
// 调度器主协程被挂起,CPU开始执行 fiber 的 MainFunc

步骤 3:进入 MainFunc()

cur->m_cb();                // 执行用户lambda
// 输出 "Hello Fiber"
// 调用 YieldToReady()

步骤 4:YieldToReady() 让出

cur->m_state = READY;
cur->swapOut();
// swapcontext(&fiber_ctx, &scheduler_ctx)
// 回到调度器主协程

步骤 5:调度器再次调度

一段时间后,调度器再次选择该协程:

fiber->swapIn();            // 再次 swapcontext,从 YieldToReady() 的下一行恢复

步骤 6:用户lambda继续执行

// 输出 "Fiber resumed"
// lambda 结束

步骤 7:MainFunc() 收尾

cur->m_state = TERM;
cur.reset();                // 释放 shared_ptr
raw_ptr->swapOut();         // 回到调度器
// 协程结束,调度器可以将其从队列移除

十一、学习验证清单

学完后,你应该能:

  • 解释 pthread_create 后为什么需要 m_semaphore.wait()/notify() 来握手。
  • 说出 thread_local 的作用,以及 t_threadt_fibert_threadFiber 三者的区别。
  • 解释主协程(Fiber())和子协程(Fiber(cb, ...))在构造上的核心差异(栈分配、makecontext)。
  • 说明 swapIn/swapOutcall/back 的使用场景区别。
  • 解释 MainFunc()cur.reset() 的作用,如果没有这行会发生什么。
  • 画出协程状态转换图,说明 INITEXECREADYHOLDTERM 之间的转移条件。
  • 解释 ucontext 的四个函数(getcontextmakecontextswapcontextsetcontext)各自的作用。
  • 对比 MutexSpinlockCASLock 的底层实现和适用场景。
  • 说明 syscall(SYS_gettid)pthread_self() 返回的 ID 有何不同。
  • 解释为什么协程切换不需要陷入内核,而线程切换需要。
  • 建议自己实现一个最小线程+协程系统,包含:
  1. 线程封装

    • class Thread { pthread_t m_thread; static void* run(void*); };

    • 支持 join()GetThis()

    • thread_local 保存当前线程指针。

  2. 协程封装(基于 ucontext)

    • class Fiber { ucontext_t m_ctx; char* m_stack; std::function<void()> m_cb; };

    • 支持无参构造(主协程)和有参构造(子协程)。

    • 实现 swapIn() / swapOut()

    • 实现 GetThis() 懒加载主协程。

  3. 调度器(极简版)

    • class Scheduler { std::vector<Fiber::ptr> m_fibers; };

    • schedule(Fiber::ptr) 入队。

    • run() 循环:取出队首 → swapIn() → 协程让出后回到 run()

  4. 锁封装

    • class Mutex { pthread_mutex_t m_mutex; }

    • template<class T> class ScopedLock { T& m; ~ScopedLock() { m.unlock(); } };


结语

        OK啊,我们的线程与协程就完结了,此处的内容也是很重要的,是我们高性能服务器的基础,希望这个系列能帮到你。

我是YYYing,后面还有更精彩的内容,希望各位能多多关注支持一下主包。

无限进步,我们下次再见!


---⭐️封面自取⭐️---

Logo

openEuler 是由开放原子开源基金会孵化的全场景开源操作系统项目,面向数字基础设施四大核心场景(服务器、云计算、边缘计算、嵌入式),全面支持 ARM、x86、RISC-V、loongArch、PowerPC、SW-64 等多样性计算架构

更多推荐