本期我们接着深入项目编写

        相关代码上传至作者的个人gitee:仿muduo服务器: 本项目致力于实现一个仿造muduo库的简易并发服务器,为个人项目,参考即可喜欢请点个赞谢谢

目录

LoopThread模块

        设计思想

        源码

LoopThreadPool模块

        设计思想

        源码


LoopThread模块

        设计思想

        目标:将EventLoop模块与线程整合起来  
        EventLoop模块与线程是一一对应的。EventLoop模块实例化的对象,在构造的时候就会初始化_thread_id,而后边当运行一个操作的时候判断当前是否运行在eventLoop模块对应的线程中,就是将线程ID与EventLoop模块中的thread_id进行一个比较,相同就表示在同一个线程,不同就表示当前运行线程并不是EventLoop线程。

        含义:EventLoop模块在实例化对象的时候,必须在线程内部。EventLoop实例化对象时会设置自己的thread_id。如果我们先创建了多个EventLoop对象,然后创建了多个线程,将各个线程的id,重新给EventLoop进行设置  
        存在问题:在构造EventLoop对象,到设置新的thread_id期间将是不可控的。  
因此我们必须先创建线程,然后在线程的入口函数中,去实例化EventLoop对象

        源码

        LoopThread.hpp

#pragma once
#include "EventLoop.hpp"
#include <thread>
#include <mutex>
#include<condition_variable>
#include <atomic>
#include <functional>
namespace ImMuduo
{
    class LoopThread
    {
        public:
            LoopThread();
            ~LoopThread();
            //获取EventLoop智能指针
            EventLoop* GetLoop();
        private:
            //线程入口函数——创建EventLoop对象并启动模块功能
            void ThreadEntry();
            
        private:
            std::thread thread_;//EventLoop线程
            std::unique_ptr<EventLoop> loop_;//EventLoop智能指针,需要在线程内实例化
            //用于实现Loop操作的同步,避免在构造函数中创建EventLoop对象,导致线程创建失败
            std::mutex mutex_;
            std::condition_variable condition_;
    };
}

        LoopThread.cpp

#include "LoopThread.hpp"

namespace ImMuduo
{
    LoopThread::LoopThread()
        :thread_(&LoopThread::ThreadEntry, this)
        ,loop_(nullptr)
        ,mutex_()
        ,condition_()
    {}

    LoopThread::~LoopThread()
    {
        if (loop_)
        {
            loop_->Stop();
        }
        if (thread_.joinable())
        {
            thread_.join();
        }
    }

    EventLoop* LoopThread::GetLoop()
    {
        EventLoop* loop = nullptr;
        {
            std::unique_lock<std::mutex> lock(mutex_);
            condition_.wait(lock, [this]() { return loop_ != nullptr; });
            loop = loop_.get();
        }
        return loop;
    }

    void LoopThread::ThreadEntry()
    {
        auto loop = std::make_unique<EventLoop>();
        {
            std::unique_lock<std::mutex> lock(mutex_);
            loop_ = std::move(loop);
            condition_.notify_one();
        }
        loop_->Start();
    }
}

        

LoopThreadPool模块

        设计思想

        针对LoopThread设计一个线程池:
LoopThreadPool模块:对所有的LoopThread进行管理及分配
        功能
1. 线程数量可配置(0个或多个)
   注意事项:在服务器中,主从Reactor模型是主线程只负责新连接获取,从属线程负责新连接的事件监控及处理
   因此当前的线程池,有可能从属线程会数量为0,也就是实现单Reactor服务器,一个线程及负责获取连接,也负责连接的处理

2. 对所有的线程进行管理,其实就是管理0个或多个LoopThread对象

3. 提供线程分配的功能
   当主线程获取了一个新连接,需要将新连接挂到从属线程上进行事件监控及处理
   假设有0个从属线程,则直接分配给主线程的EventLoop,进行处理
   假设有多个从属线程,则采用RR轮转思想,进行线程的分配(将对应线程的EventLoop获取到,设置给对应的Connection)

        源码

        LoopThreadPool.hpp

#pragma once
#include"LoopThread.hpp"
namespace ImMuduo
{
    class LoopThreadPool
    {
        public:
            LoopThreadPool(EventLoop* eventLoop);
            ~LoopThreadPool();
            void SetThreadNum(int thread_num);
            void CreateThreads();
            EventLoop* NextLoop();
        private:
            EventLoop* loop_;
            int next_idx_;
            int thread_num_;
            std::vector<std::unique_ptr<LoopThread>> threads_;
            std::vector<EventLoop*> loops_;
    };
}

        LoopThreadPool.cpp

#include"LoopThreadPool.hpp"
namespace ImMuduo
{
    LoopThreadPool::LoopThreadPool(EventLoop* eventLoop)
        :next_idx_(0), thread_num_(0), loop_(eventLoop)
    {}
    LoopThreadPool::~LoopThreadPool()
    {
    }

    void LoopThreadPool::SetThreadNum(int thread_num)
    {
        thread_num_ = thread_num;
    }
    void LoopThreadPool::CreateThreads()
    {
        if (thread_num_ <= 0) return;
        threads_.reserve(thread_num_);
        loops_.reserve(thread_num_);
        for (int i = 0; i < thread_num_; ++i)
        {
            auto t = std::make_unique<LoopThread>();
            loops_.push_back(t->GetLoop());
            threads_.push_back(std::move(t));
        }
    }

    EventLoop* LoopThreadPool::NextLoop()
    {
        if (loops_.empty()) return loop_;
        EventLoop* ret = loops_[next_idx_];
        next_idx_ = (next_idx_ + 1) % static_cast<int>(loops_.size());
        return ret;
    }
}

        本期内容到这里就结束了,喜欢请点个赞谢谢

封面图自取:

Logo

openEuler 是由开放原子开源基金会孵化的全场景开源操作系统项目,面向数字基础设施四大核心场景(服务器、云计算、边缘计算、嵌入式),全面支持 ARM、x86、RISC-V、loongArch、PowerPC、SW-64 等多样性计算架构

更多推荐