前面我们已经实现了对应的基础框架,本期我们就来写基于此之上的回显服务器

        相关代码已经上传:仿muduo服务器: 本项目致力于实现一个仿造muduo库的简易并发服务器,为个人项目,参考即可

目录

TcpServer模块

        设计思路

        源码

EchoServer模块

        实现

        服务器运行


TcpServer模块

        设计思路

        Tcp模块是对过往模块的整合,通过TcpServer模块实例化的对象,可以非常简单的完成一个服务器的搭建管理:

1. Acceptor对象,创建一个监听套接字  
2. EventLoop对象,baseLoop对象,实现对监听套接字的事件监控  
3. std::unordered_map<uint64_t, PtrConnection> _conns,实现对所有新建连接的管理  
4. LoopThreadPool对象,创建loop线程池,对新建连接进行事件监控及处理  

功能
1. 设置从属线程池数量  
2. 启动服务器  
3. 设置各种回调函数(连接建立完成、消息、关闭、任意),用户设置给TcpServer,TcpServer设置给获取的新连接  
4. 是否启动非活跃连接超时销毁功能  
5. 添加定时任务功能

流程:

1. 在TcpServer中实例化一个Acceptor对象,以及一个EventLoop对象(baseloop)

2. 将Acceptor挂到baseloop上进行事件监控

3. 一旦Acceptor对象就绪了可读事件,则执行读事件回调函数获取新建连接

4. 对新连接,创建一个Connection进行管理

5. 对连接对应的Connection设置功能回调(连接完成回调、消息回调、关闭回调、任意事件回调)

6. 启动Connection的非活跃连接的超时销毁规则

7. 将新连接对应的Connection挂到LoopThreadPool中的从属线程对应的EventLoop中进行事件监控

8. 一旦Connection对应的连接就绪了可读事件,则这时候执行读事件回调函数,读取数据,读取完毕后调用TcpServer设置的消息回调

        源码

        TcpServer.hpp

#pragma once
#include"Acceptor.hpp"
#include"EventLoop.hpp"
#include"Channel.hpp"
#include"LoopThreadPool.hpp"
#include"Connection.hpp"
namespace ImMuduo 
{
    class TcpServer
    {
        using ConnectionCallback=std::function<void(const ConnectionPtr&)>;
        using MessageCallback=std::function<void(const ConnectionPtr&,Buffer*)>;
        using ClosedComplete=std::function<void(const ConnectionPtr&)>;
        using AnyCallback=std::function<void(const ConnectionPtr&)>;

        ConnectionCallback ConnectedCallback_;//链接的回调函数
        MessageCallback MessageCallback_;//链接的消息回调函数
        ClosedComplete ClosedCompleteCallback_;//链接的关闭完成回调函数
        AnyCallback AnyEventCallback_;//链接的任意回调函数
        // 组件内的连接关闭回调 - 组件内设置的,因为服务器组件内会把所有的连接管理起来,一旦某个连接要关闭
        // 就应该从管理的地方移除掉自己的信息 
        ClosedComplete ServerClosedCompleteCallback_;//组件内的关闭完成回调函数
        public:
            TcpServer(int port);
            ~TcpServer();
            void SetLoopThreadCount(int thread_count);
            void start();
            void stop();
        public:
            //启用不活动链接的释放,并且定义多长时间无通信后释放
            void EnableInactiveRelease(int sec);
            //添加定时任务,用于检查不活动链接是否超时
            void RunAfter(double delay,const Functor& callback);
            //设置链接的回调函数
            void SetConnectedCallback(ConnectionCallback callback);
            //设置链接的消息回调函数
            void SetMessageCallback(MessageCallback callback);
            //设置链接的关闭完成回调函数
            void SetClosedCompleteCallback(ClosedComplete callback);
            //设置链接的任意回调函数
            void SetAnyEventCallback(AnyCallback callback);
        private:
            //为新连接构造connection对象管理
            void NewConnection(int connfd);
            //移除连接的connection对象管理信息
            void RemoveConnection(const ConnectionPtr& conn);
        private:
            int port_;//监听端口
            uint64_t conn_id_;//自动增长的ID
            bool enabled_inactive_release_;//是否启用连接超时释放
            EventLoop loop_;//主线程,负责监听事件处理(必须位于 acceptor_ 之前)
            LoopThreadPool threadpool_;//从属EventLoop的线程池
            Acceptor acceptor_;//监听套接字的管理对象
            std::unordered_map<uint64_t,ConnectionPtr> channels_;//保存所有对应的连接对象的shared_ptr映射表
    };
}   

        TcpServer.cpp

#include "TcpServer.hpp"
#include "Log.hpp"
#include <cassert>

namespace ImMuduo
{
    TcpServer::TcpServer(int port)
        : port_(port), conn_id_(0), enabled_inactive_release_(false),
          acceptor_(&loop_, port), threadpool_(&loop_)
    {
        acceptor_.setCallback([this](int connfd) {
            NewConnection(connfd);
        });
    }

    TcpServer::~TcpServer() {}

    void TcpServer::SetLoopThreadCount(int thread_count)
    {
        threadpool_.SetThreadNum(thread_count);
        threadpool_.CreateThreads();
    }

    void TcpServer::start()
    {
        loop_.Start();
    }

    void TcpServer::stop()
    {
        loop_.Stop();
    }

    void TcpServer::EnableInactiveRelease(int sec)
    {
        enabled_inactive_release_ = true;
        for (auto& kv : channels_)
        {
            kv.second->EnableInactiveRelease(sec);
        }
    }

    void TcpServer::RunAfter(double delay, const Functor& callback)
    {
        static uint64_t timer_id = 0;
        loop_.TimerAdd(++timer_id, static_cast<uint32_t>(delay), callback);
    }

    void TcpServer::SetConnectedCallback(ConnectionCallback callback)
    {
        ConnectedCallback_ = std::move(callback);
    }

    void TcpServer::SetMessageCallback(MessageCallback callback)
    {
        MessageCallback_ = std::move(callback);
    }

    void TcpServer::SetClosedCompleteCallback(ClosedComplete callback)
    {
        ClosedCompleteCallback_ = std::move(callback);
    }

    void TcpServer::SetAnyEventCallback(AnyCallback callback)
    {
        AnyEventCallback_ = std::move(callback);
    }

    void TcpServer::NewConnection(int connfd)
    {
        uint64_t id = ++conn_id_;
        EventLoop* worker = threadpool_.NextLoop();

        auto conn = std::make_shared<Connection>(
            connfd, id, worker, ConnectedCallback_);

        conn->SetMessageCallback(MessageCallback_);
        conn->SetClosedCompleteCallback(ClosedCompleteCallback_);
        conn->SetAnyEventCallback(AnyEventCallback_);
        conn->SetClosedCompleteCallback(ClosedCompleteCallback_);
        conn->SetServerClosedCompleteCallback(
            [this](const ConnectionPtr& c) { RemoveConnection(c); });

        if (enabled_inactive_release_)
        {
            conn->EnableInactiveRelease(10);
        }

        channels_[id] = conn;
        conn->Established();
    }

    void TcpServer::RemoveConnection(const ConnectionPtr& conn)
    {
        channels_.erase(conn->GetConnId());
    }
}

EchoServer模块

        实现

        EchoServer.hpp

#pragma once
#include"TcpServer.hpp"
#include"Buffer.hpp"
namespace ImMuduo 
{
    class EchoServer
    {
        public:
            EchoServer(int port);
            ~EchoServer()=default;
            void Start();
            void Stop();
        private:
            void OnConnected(const ConnectionPtr& conn);
            void OnClosed(const ConnectionPtr& conn);
            void OnMessage(const ConnectionPtr& conn, Buffer* buf);
        private:
            TcpServer tcp_server_;
    };
}

        EchoServer.cpp

#include "EchoServer.hpp"
#include "Log.hpp"

namespace ImMuduo
{
    EchoServer::EchoServer(int port)
        : tcp_server_(port)
    {
        tcp_server_.SetConnectedCallback(
            [this](const ConnectionPtr& conn) { OnConnected(conn); });
        tcp_server_.SetMessageCallback(
            [this](const ConnectionPtr& conn, Buffer* buf) {
                OnMessage(conn, buf);
            });
        tcp_server_.SetClosedCompleteCallback(
            [this](const ConnectionPtr& conn) { OnClosed(conn); });
    }

    void EchoServer::Start()
    {
        tcp_server_.SetLoopThreadCount(2);
        tcp_server_.start();
    }

    void EchoServer::Stop()
    {
        tcp_server_.stop();
    }

    void EchoServer::OnConnected(const ConnectionPtr& conn)
    {
        INFO("EchoServer: new connection, fd=%d, connId=%lu",
             conn->GetFd(), conn->GetConnId());
    }

    void EchoServer::OnMessage(const ConnectionPtr& conn, Buffer* buf)
    {
        conn->Send(buf->ReadPos(), buf->ReadableSize());
        buf->Clear();
    }

    void EchoServer::OnClosed(const ConnectionPtr& conn)
    {
        INFO("EchoServer: connection closed, fd=%d, connId=%lu",
             conn->GetFd(), conn->GetConnId());
    }
}

        服务器运行

#include"EchoServer.hpp"
#include"Log.hpp"
using namespace ImMuduo;
int main()
{
    EchoServer echo_server(8888);
    echo_server.Start();
    return 0;
}

        本期内容到这里结束了,喜欢请点个赞谢谢

封面图自取:

Logo

openEuler 是由开放原子开源基金会孵化的全场景开源操作系统项目,面向数字基础设施四大核心场景(服务器、云计算、边缘计算、嵌入式),全面支持 ARM、x86、RISC-V、loongArch、PowerPC、SW-64 等多样性计算架构

更多推荐