本期我们接着深入项目

        相关代码已经上传至gitee:仿muduo服务器: 本项目致力于实现一个仿造muduo库的简易并发服务器,为个人项目,参考即可喜欢请点个赞谢谢

目录

Connection模块

        设计思想

        源码

Acceptor模块

        设计思想

        源码


Connection模块

        设计思想

        目的:对连接进行全方位的管理,对通信连接的所有操作都是通过这个模块提供的功能完成。

        管理
1. 套接字的管理,能够进行套接字的操作
2. 连接事件的管理,可读、可写、错误、挂断、任意
3. 缓冲区管理,便于socket数据的接收和发送
4. 协议上下文的管理,记录请求数据的处理过程
5. 回调函数的管理

因为连接接收到数据之后该如何处理,需要由用户决定,因此必须有业务处理回调函数
一个连接建立成功后,该如何处理,由用户决定,因此必须有连接建立成功的回调函数
一个连接关闭前,该如何处理,由用户决定,因此必须有连接关闭回调函数。
任意事件的产生,有没有某些处理,由用户决定,因此必须有任意事件的回调函数

功能
1. 发送数据 --- 给用户提供的发送数据接口,并不是真正的发送接口,而只是把数据放到发送缓冲区,然后启动写事件监控
2. 关闭连接 --- 给用户提供的关闭连接接口,应该在实际释放连接之前,看看输入输出缓冲区是否有数据待处理
3. 启动非活跃连接的超时销毁功能
4. 取消非活跃连接的超时销毁功能
5. 协议切换 --- 一个连接接收数据后如何进行业务处理,取决于上下文,以及数据的业务处理回调函数

        源码

        Connection.hpp

#pragma once
#include"Channel.hpp"
#include"Buffer.hpp"
#include"Socket.hpp"
#include"EventLoop.hpp"
#include <memory>
#include<any>
namespace ImMuduo
{
    enum class ConnectionState
    {
        Connected,//已连接
        Connecting,//连接中
        Disconnected,//已断开
        Disconnecting,//断开中
    };
    class Connection : public std::enable_shared_from_this<Connection>
    {
        using ConnectionPtr = std::shared_ptr<Connection>;
        using ConnectionCallback=std::function<void(const ConnectionPtr&)>;
        using MessageCallback=std::function<void(const ConnectionPtr&,Buffer*)>;
        using ClosedComplete=std::function<void(const ConnectionPtr&)>;
        using AnyCallback=std::function<void(const ConnectionPtr&)>;

        ConnectionCallback ConnectedCallback_;//链接的回调函数
        MessageCallback MessageCallback_;//链接的消息回调函数
        ClosedComplete ClosedCompleteCallback_;//链接的关闭完成回调函数
        AnyCallback AnyEventCallback_;//链接的任意回调函数
        // 组件内的连接关闭回调 - 组件内设置的,因为服务器组件内会把所有的连接管理起来,一旦某个连接要关闭
        // 就应该从管理的地方移除掉自己的信息 
        ClosedComplete ServerClosedCompleteCallback_;//组件内的关闭完成回调函数
        public:
            Connection(int fd,uint64_t connId,EventLoop* loop,ConnectionCallback connectedCallback);
            ~Connection();
            //获取链接的文件描述符
            int GetFd()const;
            //获取链接的唯一ID
            uint64_t GetConnId()const;
            //获取链接的状态
            ConnectionState GetState()const;
            //判断链接是否处于Connected状态
            bool Connected()const;
            //设置链接的状态
            void SetState(ConnectionState state);
            //设置链接的上下文数据——链接完成时调用
            void SetContext(const std::any& context);
            //获取链接的上下文数据  
            std::any GetContext()const;
        public:
            //设置链接的回调函数
            void SetConnectedCallback(ConnectionCallback callback);
            //设置链接的消息回调函数
            void SetMessageCallback(MessageCallback callback);
            //设置链接的关闭完成回调函数
            void SetClosedCompleteCallback(ClosedComplete callback);
            //设置链接的任意回调函数
            void SetAnyEventCallback(AnyCallback callback);
        public:
            //发送数据
            void Send(const char* data,size_t len);
            //关闭链接——不是实际的关闭接口,提供给组件使用者
            void ShutDown();
            //启用不活动链接的释放,并且定义多长时间无通信后释放
            void EnableInactiveRelease(int sec);
            //取消不活动链接的释放
            void CancelInactiveRelease();
            //链接建立时调用,进行channel回调设置,启动读取监控
            void Established();
            //协议切换,将链接的上下文数据设置为新的协议上下文数据
            void UpgradeContext(const ConnectionCallback& callback,
                const MessageCallback& messageCallback,
                const ClosedComplete& closedCompleteCallback,
                const AnyCallback& anyCallback,
                const std::any& context);
        private:
            //五个链接事件的读取处理
            //描述符触发可读事件后进行的处理,接收socket数据放到缓冲区中,调用Message回调函数
            void HandleRead();
            //描述符触发可写事件后进行的处理,将缓冲区中的数据发送到socket
            void HandleWrite();
            //描述符触发关闭事件后进行的处理,关闭链接
            void HandleClose();
            //描述符触发错误事件后进行的处理,关闭链接
            void HandleError();
            //描述符触发任意事件后进行的处理,调用AnyCallback回调函数
            void HandleAnyEvent();
        private:
            //实际的释放关闭接口,只能内部执行
            void ReleaseInLoop();
            //连接获取之后,所处的状态下要进行各种设置(给channel设置事件回调,启动读取监控)
            void EstablishedInLoop(); 
            //Send的实际执行函数
            void SendInLoop(const char* data,size_t len);
            //ShutDown的实际执行函数
            void ShutDownInLoop();
            //EnableInactiveRelease的实际执行函数
            void EnableInactiveReleaseInLoop(int sec);
            //CancelInactiveRelease的实际执行函数
            void CancelInactiveReleaseInLoop();
            //UpgradeContext的实际执行函数
            void UpgradeContextInLoop(const ConnectionCallback& callback,
                const MessageCallback& messageCallback,
                const ClosedComplete& closedCompleteCallback,
                const AnyCallback& anyCallback,
                const std::any& context);
        private:
            int fd_;//链接的文件描述符
            uint64_t connId_;//链接的唯一ID
            ConnectionState state_;//链接的状态
            Socket socket_;//链接的socket
            Buffer inputBuffer_;//输入缓冲区
            Buffer outputBuffer_;//输出缓冲区
            Channel channel_;//链接的channel
            std::any context_;//链接的上下文数据
            bool EnableInactiveRelease_;//是否启用不活动链接的释放
            EventLoop* eventLoop_;//链接所属的事件循环
    };
}

        Connection.cpp

#include "Connection.hpp"
#include "Log.hpp"
#include <cstring>
#include <cassert>
#include <cerrno>

namespace ImMuduo
{
    Connection::Connection(int fd, uint64_t connId, EventLoop* loop,
                           ConnectionCallback connectedCallback)
        : ConnectedCallback_(std::move(connectedCallback)),
          fd_(fd), connId_(connId), state_(ConnectionState::Connecting),
          socket_(fd), channel_(fd, nullptr),
          EnableInactiveRelease_(false), eventLoop_(loop)
    {
        channel_.SetCloseCallback([this]() { HandleClose(); });
        channel_.SetReadCallback([this]() { HandleRead(); });
        channel_.SetWriteCallback([this]() { HandleWrite(); });
        channel_.SetErrorCallback([this]() { HandleError(); });
        channel_.SetEventCallback([this]() { HandleAnyEvent(); });
    }

    Connection::~Connection()
    {
        DEBUG("Connection released, fd=%d, connId=%lu", fd_, connId_);
    }

    // ========== 基础 getter/setter ==========
    int Connection::GetFd() const { return fd_; }
    uint64_t Connection::GetConnId() const { return connId_; }
    ConnectionState Connection::GetState() const { return state_; }

    bool Connection::Connected() const
    {
        return state_ == ConnectionState::Connected;
    }

    void Connection::SetState(ConnectionState state) { state_ = state; }

    void Connection::SetContext(const std::any& context) { context_ = context; }
    std::any Connection::GetContext() const { return context_; }

    // ========== 回调设置 ==========
    void Connection::SetConnectedCallback(ConnectionCallback callback)
    { ConnectedCallback_ = std::move(callback); }

    void Connection::SetMessageCallback(MessageCallback callback)
    { MessageCallback_ = std::move(callback); }

    void Connection::SetClosedCompleteCallback(ClosedComplete callback)
    { ClosedCompleteCallback_ = std::move(callback); }

    void Connection::SetAnyEventCallback(AnyCallback callback)
    { AnyEventCallback_ = std::move(callback); }

    // ========== 对外接口(dispatch 到 event loop) ==========
    void Connection::Send(const char* data, size_t len)
    {
        eventLoop_->RunInLoop([this, data = std::string(data, len)]() {
            SendInLoop(data.c_str(), data.size());
        });
    }

    void Connection::ShutDown()
    {
        eventLoop_->RunInLoop([this]() { ShutDownInLoop(); });
    }

    void Connection::EnableInactiveRelease(int sec)
    {
        eventLoop_->RunInLoop([this, sec]() { EnableInactiveReleaseInLoop(sec); });
    }

    void Connection::CancelInactiveRelease()
    {
        eventLoop_->RunInLoop([this]() { CancelInactiveReleaseInLoop(); });
    }

    void Connection::Established()
    {
        eventLoop_->RunInLoop([this]() { EstablishedInLoop(); });
    }

    void Connection::UpgradeContext(const ConnectionCallback& callback,
        const MessageCallback& messageCallback,
        const ClosedComplete& closedCompleteCallback,
        const AnyCallback& anyCallback,
        const std::any& context)
    {
        eventLoop_->RunInLoop([this, callback, messageCallback,
                               closedCompleteCallback, anyCallback, context]() {
            UpgradeContextInLoop(callback, messageCallback,
                                 closedCompleteCallback, anyCallback, context);
        });
    }

    // ========== 事件处理 ==========
    void Connection::HandleRead()
    {
        char buf[65536];
        ssize_t n = socket_.RecvNoBlock(buf, sizeof(buf));

        if (n > 0)
        {
            inputBuffer_.WriteAndPush(buf, n);
            if (inputBuffer_.ReadableSize() > 0 && MessageCallback_)
            {
                MessageCallback_(shared_from_this(), &inputBuffer_);
            }
        }
        else if (n == 0)
        {
            ShutDownInLoop();
        }
        else  // n < 0
        {
            if (errno != EAGAIN && errno != EINTR)
            {
                ShutDownInLoop();
            }
        }
    }

    void Connection::HandleWrite()
    {
        if (outputBuffer_.ReadableSize() == 0) return;

        ssize_t ret = socket_.SendNoBlock(outputBuffer_.ReadPos(),
                                           outputBuffer_.ReadableSize());
        if (ret > 0)
        {
            outputBuffer_.MoveReadPos(ret);
            if (outputBuffer_.ReadableSize() == 0)
            {
                channel_.DisableWrite();
                eventLoop_->UpdateEvent(&channel_);
                if (state_ == ConnectionState::Disconnecting)
                {
                    return ReleaseInLoop();
                }
            }
        }
        else if (ret < 0)
        {
            if (errno == EAGAIN || errno == EINTR) return;
            ERROR("HandleWrite send failed: %s", strerror(errno));
            ShutDownInLoop();
        }
    }

    void Connection::HandleClose()
    {
        if (inputBuffer_.ReadableSize() > 0 && MessageCallback_)
            MessageCallback_(shared_from_this(), &inputBuffer_);
        ReleaseInLoop();
    }

    void Connection::HandleError()
    {
        ERROR("Connection error: %s", strerror(errno));
        HandleClose();
    }

    void Connection::HandleAnyEvent()
    {
        if (EnableInactiveRelease_)
        {
            eventLoop_->TimerRefresh(connId_);
        }
        if (AnyEventCallback_)
        {
            AnyEventCallback_(shared_from_this());
        }
    }

    // ========== InLoop 实现 ==========
    void Connection::EstablishedInLoop()
    {
        assert(state_ == ConnectionState::Connecting);
        state_ = ConnectionState::Connected;
        channel_.EnableRead();
        eventLoop_->UpdateEvent(&channel_);
        if (ConnectedCallback_) ConnectedCallback_(shared_from_this());
    }

    void Connection::ReleaseInLoop()
    {
        state_ = ConnectionState::Disconnected;
        channel_.Remove();
        socket_.Close();
        if (eventLoop_->TimerExist(connId_))
        {
            CancelInactiveReleaseInLoop();
        }
        if (ClosedCompleteCallback_) ClosedCompleteCallback_(shared_from_this());
        if (ServerClosedCompleteCallback_) ServerClosedCompleteCallback_(shared_from_this());
    }

    void Connection::SendInLoop(const char* data, size_t len)
    {
        if (state_ == ConnectionState::Disconnected) return;
        outputBuffer_.WriteAndPush(data, len);
        if (!channel_.WriteAble())
        {
            channel_.EnableWrite();
            eventLoop_->UpdateEvent(&channel_);
        }
    }

    void Connection::ShutDownInLoop()
    {
        state_ = ConnectionState::Disconnecting;
        if (inputBuffer_.ReadableSize() > 0)
        {
            if (MessageCallback_) MessageCallback_(shared_from_this(), &inputBuffer_);
            channel_.DisableWrite();
            eventLoop_->UpdateEvent(&channel_);
        }
        if (outputBuffer_.ReadableSize() > 0)
        {
            if (!channel_.WriteAble())
            {
                channel_.EnableWrite();
                eventLoop_->UpdateEvent(&channel_);
            }
        }
        if (outputBuffer_.ReadableSize() == 0)
        {
            ReleaseInLoop();
        }
    }

    void Connection::EnableInactiveReleaseInLoop(int sec)
    {
        if (EnableInactiveRelease_) return;
        EnableInactiveRelease_ = true;
        if (eventLoop_->TimerExist(connId_))
        {
            return eventLoop_->TimerRefresh(connId_);
        }
        eventLoop_->TimerAdd(connId_, sec, [this]() {
            ReleaseInLoop();
        });
    }

    void Connection::CancelInactiveReleaseInLoop()
    {
        EnableInactiveRelease_ = false;
        if (eventLoop_->TimerExist(connId_))
        {
            eventLoop_->TimerCancel(connId_);
        }
    }

    void Connection::UpgradeContextInLoop(const ConnectionCallback& callback,
                const MessageCallback& messageCallback,
                const ClosedComplete& closedCompleteCallback,
                const AnyCallback& anyCallback,
                const std::any& context)
    {
        context_ = context;
        ConnectedCallback_ = callback;
        MessageCallback_ = messageCallback;
        ClosedCompleteCallback_ = closedCompleteCallback;
        AnyEventCallback_ = anyCallback;
    }
}

Acceptor模块

        设计思想

        Acceptor模块:对监听套接字进行管理

        1. 创建一个监听套接字  
        2. 启动读事件监控  
        3. 事件触发后,获取新连接  
        4. 调用新连接获取成功后的回调函数  

        5.为新连接创建Connection进行管理(这一步不是Acceptor模块操作,应该是服务器模块)  
因为Acceptor模块只进行监听连接的管理,因此获取到新连接的描述符之后,对于新连接描述符如何处理其实并不关心  

        对于新连接如何处理,应该是服务器模块来管理的,服务器模块,实现了一个对于新连接描述符处理的函数,将这个函数设置给Acceptor模块中的回调函数

        源码

        Acceptor.hpp

#pragma once
#include"Socket.hpp"
#include"EventLoop.hpp" 
namespace ImMuduo
{
    class Acceptor
    {
        using AcceptorCallback=std::function<void(int)>;
        public:
            Acceptor(EventLoop* eventLoop,int port);
            ~Acceptor()=default;
            int CreateServer();
            void setCallback(AcceptorCallback callback);
        private:
            //  获取新链接,调用回调函数
            void HandleRead();
        private:
            AcceptorCallback callback_;//连接回调函数
            Socket socket_;//创建监听套接字
            EventLoop* eventLoop_;                 //监控监听套接字
            std::unique_ptr<Channel> channel_;     //监听套接字事件管理
    };
}

        Acceptor.cpp

#include "Acceptor.hpp"
#include"Log.hpp"
#include<cassert>
namespace ImMuduo
{
    Acceptor::Acceptor(EventLoop* eventLoop, int port)
        :eventLoop_(eventLoop)
    {
        socket_.CreateServer(port);
        channel_ = std::make_unique<Channel>(socket_.fd(), nullptr);
        channel_->EnableRead();
        channel_->SetReadCallback([this]() { HandleRead(); });
        eventLoop_->UpdateEvent(channel_.get());
    }
    int Acceptor::CreateServer()
    {
        return socket_.fd();
    }
    void Acceptor::setCallback(AcceptorCallback callback)
    {
        callback_ = callback;
    }
    void Acceptor::HandleRead()
    {
        int connfd=socket_.Accept();
        if(callback_)
        {
            callback_(connfd);
        }
    }
}

        本期内容到这里了,喜欢请点个赞谢谢

封面图自取:

Logo

openEuler 是由开放原子开源基金会孵化的全场景开源操作系统项目,面向数字基础设施四大核心场景(服务器、云计算、边缘计算、嵌入式),全面支持 ARM、x86、RISC-V、loongArch、PowerPC、SW-64 等多样性计算架构

更多推荐