仿Muduo的高并发服务器:Connection模块/Acceptor模块
本期我们接着深入项目。
本期我们接着深入项目
相关代码已经上传至gitee:仿muduo服务器: 本项目致力于实现一个仿造muduo库的简易并发服务器,为个人项目,参考即可喜欢请点个赞谢谢
目录
Connection模块
设计思想
目的:对连接进行全方位的管理,对通信连接的所有操作都是通过这个模块提供的功能完成。
管理:
1. 套接字的管理,能够进行套接字的操作
2. 连接事件的管理,可读、可写、错误、挂断、任意
3. 缓冲区管理,便于socket数据的接收和发送
4. 协议上下文的管理,记录请求数据的处理过程
5. 回调函数的管理
因为连接接收到数据之后该如何处理,需要由用户决定,因此必须有业务处理回调函数
一个连接建立成功后,该如何处理,由用户决定,因此必须有连接建立成功的回调函数
一个连接关闭前,该如何处理,由用户决定,因此必须有连接关闭回调函数。
任意事件的产生,有没有某些处理,由用户决定,因此必须有任意事件的回调函数
功能:
1. 发送数据 --- 给用户提供的发送数据接口,并不是真正的发送接口,而只是把数据放到发送缓冲区,然后启动写事件监控
2. 关闭连接 --- 给用户提供的关闭连接接口,应该在实际释放连接之前,看看输入输出缓冲区是否有数据待处理
3. 启动非活跃连接的超时销毁功能
4. 取消非活跃连接的超时销毁功能
5. 协议切换 --- 一个连接接收数据后如何进行业务处理,取决于上下文,以及数据的业务处理回调函数
源码
Connection.hpp
#pragma once
#include"Channel.hpp"
#include"Buffer.hpp"
#include"Socket.hpp"
#include"EventLoop.hpp"
#include <memory>
#include<any>
namespace ImMuduo
{
enum class ConnectionState
{
Connected,//已连接
Connecting,//连接中
Disconnected,//已断开
Disconnecting,//断开中
};
class Connection : public std::enable_shared_from_this<Connection>
{
using ConnectionPtr = std::shared_ptr<Connection>;
using ConnectionCallback=std::function<void(const ConnectionPtr&)>;
using MessageCallback=std::function<void(const ConnectionPtr&,Buffer*)>;
using ClosedComplete=std::function<void(const ConnectionPtr&)>;
using AnyCallback=std::function<void(const ConnectionPtr&)>;
ConnectionCallback ConnectedCallback_;//链接的回调函数
MessageCallback MessageCallback_;//链接的消息回调函数
ClosedComplete ClosedCompleteCallback_;//链接的关闭完成回调函数
AnyCallback AnyEventCallback_;//链接的任意回调函数
// 组件内的连接关闭回调 - 组件内设置的,因为服务器组件内会把所有的连接管理起来,一旦某个连接要关闭
// 就应该从管理的地方移除掉自己的信息
ClosedComplete ServerClosedCompleteCallback_;//组件内的关闭完成回调函数
public:
Connection(int fd,uint64_t connId,EventLoop* loop,ConnectionCallback connectedCallback);
~Connection();
//获取链接的文件描述符
int GetFd()const;
//获取链接的唯一ID
uint64_t GetConnId()const;
//获取链接的状态
ConnectionState GetState()const;
//判断链接是否处于Connected状态
bool Connected()const;
//设置链接的状态
void SetState(ConnectionState state);
//设置链接的上下文数据——链接完成时调用
void SetContext(const std::any& context);
//获取链接的上下文数据
std::any GetContext()const;
public:
//设置链接的回调函数
void SetConnectedCallback(ConnectionCallback callback);
//设置链接的消息回调函数
void SetMessageCallback(MessageCallback callback);
//设置链接的关闭完成回调函数
void SetClosedCompleteCallback(ClosedComplete callback);
//设置链接的任意回调函数
void SetAnyEventCallback(AnyCallback callback);
public:
//发送数据
void Send(const char* data,size_t len);
//关闭链接——不是实际的关闭接口,提供给组件使用者
void ShutDown();
//启用不活动链接的释放,并且定义多长时间无通信后释放
void EnableInactiveRelease(int sec);
//取消不活动链接的释放
void CancelInactiveRelease();
//链接建立时调用,进行channel回调设置,启动读取监控
void Established();
//协议切换,将链接的上下文数据设置为新的协议上下文数据
void UpgradeContext(const ConnectionCallback& callback,
const MessageCallback& messageCallback,
const ClosedComplete& closedCompleteCallback,
const AnyCallback& anyCallback,
const std::any& context);
private:
//五个链接事件的读取处理
//描述符触发可读事件后进行的处理,接收socket数据放到缓冲区中,调用Message回调函数
void HandleRead();
//描述符触发可写事件后进行的处理,将缓冲区中的数据发送到socket
void HandleWrite();
//描述符触发关闭事件后进行的处理,关闭链接
void HandleClose();
//描述符触发错误事件后进行的处理,关闭链接
void HandleError();
//描述符触发任意事件后进行的处理,调用AnyCallback回调函数
void HandleAnyEvent();
private:
//实际的释放关闭接口,只能内部执行
void ReleaseInLoop();
//连接获取之后,所处的状态下要进行各种设置(给channel设置事件回调,启动读取监控)
void EstablishedInLoop();
//Send的实际执行函数
void SendInLoop(const char* data,size_t len);
//ShutDown的实际执行函数
void ShutDownInLoop();
//EnableInactiveRelease的实际执行函数
void EnableInactiveReleaseInLoop(int sec);
//CancelInactiveRelease的实际执行函数
void CancelInactiveReleaseInLoop();
//UpgradeContext的实际执行函数
void UpgradeContextInLoop(const ConnectionCallback& callback,
const MessageCallback& messageCallback,
const ClosedComplete& closedCompleteCallback,
const AnyCallback& anyCallback,
const std::any& context);
private:
int fd_;//链接的文件描述符
uint64_t connId_;//链接的唯一ID
ConnectionState state_;//链接的状态
Socket socket_;//链接的socket
Buffer inputBuffer_;//输入缓冲区
Buffer outputBuffer_;//输出缓冲区
Channel channel_;//链接的channel
std::any context_;//链接的上下文数据
bool EnableInactiveRelease_;//是否启用不活动链接的释放
EventLoop* eventLoop_;//链接所属的事件循环
};
}
Connection.cpp
#include "Connection.hpp"
#include "Log.hpp"
#include <cstring>
#include <cassert>
#include <cerrno>
namespace ImMuduo
{
Connection::Connection(int fd, uint64_t connId, EventLoop* loop,
ConnectionCallback connectedCallback)
: ConnectedCallback_(std::move(connectedCallback)),
fd_(fd), connId_(connId), state_(ConnectionState::Connecting),
socket_(fd), channel_(fd, nullptr),
EnableInactiveRelease_(false), eventLoop_(loop)
{
channel_.SetCloseCallback([this]() { HandleClose(); });
channel_.SetReadCallback([this]() { HandleRead(); });
channel_.SetWriteCallback([this]() { HandleWrite(); });
channel_.SetErrorCallback([this]() { HandleError(); });
channel_.SetEventCallback([this]() { HandleAnyEvent(); });
}
Connection::~Connection()
{
DEBUG("Connection released, fd=%d, connId=%lu", fd_, connId_);
}
// ========== 基础 getter/setter ==========
int Connection::GetFd() const { return fd_; }
uint64_t Connection::GetConnId() const { return connId_; }
ConnectionState Connection::GetState() const { return state_; }
bool Connection::Connected() const
{
return state_ == ConnectionState::Connected;
}
void Connection::SetState(ConnectionState state) { state_ = state; }
void Connection::SetContext(const std::any& context) { context_ = context; }
std::any Connection::GetContext() const { return context_; }
// ========== 回调设置 ==========
void Connection::SetConnectedCallback(ConnectionCallback callback)
{ ConnectedCallback_ = std::move(callback); }
void Connection::SetMessageCallback(MessageCallback callback)
{ MessageCallback_ = std::move(callback); }
void Connection::SetClosedCompleteCallback(ClosedComplete callback)
{ ClosedCompleteCallback_ = std::move(callback); }
void Connection::SetAnyEventCallback(AnyCallback callback)
{ AnyEventCallback_ = std::move(callback); }
// ========== 对外接口(dispatch 到 event loop) ==========
void Connection::Send(const char* data, size_t len)
{
eventLoop_->RunInLoop([this, data = std::string(data, len)]() {
SendInLoop(data.c_str(), data.size());
});
}
void Connection::ShutDown()
{
eventLoop_->RunInLoop([this]() { ShutDownInLoop(); });
}
void Connection::EnableInactiveRelease(int sec)
{
eventLoop_->RunInLoop([this, sec]() { EnableInactiveReleaseInLoop(sec); });
}
void Connection::CancelInactiveRelease()
{
eventLoop_->RunInLoop([this]() { CancelInactiveReleaseInLoop(); });
}
void Connection::Established()
{
eventLoop_->RunInLoop([this]() { EstablishedInLoop(); });
}
void Connection::UpgradeContext(const ConnectionCallback& callback,
const MessageCallback& messageCallback,
const ClosedComplete& closedCompleteCallback,
const AnyCallback& anyCallback,
const std::any& context)
{
eventLoop_->RunInLoop([this, callback, messageCallback,
closedCompleteCallback, anyCallback, context]() {
UpgradeContextInLoop(callback, messageCallback,
closedCompleteCallback, anyCallback, context);
});
}
// ========== 事件处理 ==========
void Connection::HandleRead()
{
char buf[65536];
ssize_t n = socket_.RecvNoBlock(buf, sizeof(buf));
if (n > 0)
{
inputBuffer_.WriteAndPush(buf, n);
if (inputBuffer_.ReadableSize() > 0 && MessageCallback_)
{
MessageCallback_(shared_from_this(), &inputBuffer_);
}
}
else if (n == 0)
{
ShutDownInLoop();
}
else // n < 0
{
if (errno != EAGAIN && errno != EINTR)
{
ShutDownInLoop();
}
}
}
void Connection::HandleWrite()
{
if (outputBuffer_.ReadableSize() == 0) return;
ssize_t ret = socket_.SendNoBlock(outputBuffer_.ReadPos(),
outputBuffer_.ReadableSize());
if (ret > 0)
{
outputBuffer_.MoveReadPos(ret);
if (outputBuffer_.ReadableSize() == 0)
{
channel_.DisableWrite();
eventLoop_->UpdateEvent(&channel_);
if (state_ == ConnectionState::Disconnecting)
{
return ReleaseInLoop();
}
}
}
else if (ret < 0)
{
if (errno == EAGAIN || errno == EINTR) return;
ERROR("HandleWrite send failed: %s", strerror(errno));
ShutDownInLoop();
}
}
void Connection::HandleClose()
{
if (inputBuffer_.ReadableSize() > 0 && MessageCallback_)
MessageCallback_(shared_from_this(), &inputBuffer_);
ReleaseInLoop();
}
void Connection::HandleError()
{
ERROR("Connection error: %s", strerror(errno));
HandleClose();
}
void Connection::HandleAnyEvent()
{
if (EnableInactiveRelease_)
{
eventLoop_->TimerRefresh(connId_);
}
if (AnyEventCallback_)
{
AnyEventCallback_(shared_from_this());
}
}
// ========== InLoop 实现 ==========
void Connection::EstablishedInLoop()
{
assert(state_ == ConnectionState::Connecting);
state_ = ConnectionState::Connected;
channel_.EnableRead();
eventLoop_->UpdateEvent(&channel_);
if (ConnectedCallback_) ConnectedCallback_(shared_from_this());
}
void Connection::ReleaseInLoop()
{
state_ = ConnectionState::Disconnected;
channel_.Remove();
socket_.Close();
if (eventLoop_->TimerExist(connId_))
{
CancelInactiveReleaseInLoop();
}
if (ClosedCompleteCallback_) ClosedCompleteCallback_(shared_from_this());
if (ServerClosedCompleteCallback_) ServerClosedCompleteCallback_(shared_from_this());
}
void Connection::SendInLoop(const char* data, size_t len)
{
if (state_ == ConnectionState::Disconnected) return;
outputBuffer_.WriteAndPush(data, len);
if (!channel_.WriteAble())
{
channel_.EnableWrite();
eventLoop_->UpdateEvent(&channel_);
}
}
void Connection::ShutDownInLoop()
{
state_ = ConnectionState::Disconnecting;
if (inputBuffer_.ReadableSize() > 0)
{
if (MessageCallback_) MessageCallback_(shared_from_this(), &inputBuffer_);
channel_.DisableWrite();
eventLoop_->UpdateEvent(&channel_);
}
if (outputBuffer_.ReadableSize() > 0)
{
if (!channel_.WriteAble())
{
channel_.EnableWrite();
eventLoop_->UpdateEvent(&channel_);
}
}
if (outputBuffer_.ReadableSize() == 0)
{
ReleaseInLoop();
}
}
void Connection::EnableInactiveReleaseInLoop(int sec)
{
if (EnableInactiveRelease_) return;
EnableInactiveRelease_ = true;
if (eventLoop_->TimerExist(connId_))
{
return eventLoop_->TimerRefresh(connId_);
}
eventLoop_->TimerAdd(connId_, sec, [this]() {
ReleaseInLoop();
});
}
void Connection::CancelInactiveReleaseInLoop()
{
EnableInactiveRelease_ = false;
if (eventLoop_->TimerExist(connId_))
{
eventLoop_->TimerCancel(connId_);
}
}
void Connection::UpgradeContextInLoop(const ConnectionCallback& callback,
const MessageCallback& messageCallback,
const ClosedComplete& closedCompleteCallback,
const AnyCallback& anyCallback,
const std::any& context)
{
context_ = context;
ConnectedCallback_ = callback;
MessageCallback_ = messageCallback;
ClosedCompleteCallback_ = closedCompleteCallback;
AnyEventCallback_ = anyCallback;
}
}
Acceptor模块
设计思想
Acceptor模块:对监听套接字进行管理
1. 创建一个监听套接字
2. 启动读事件监控
3. 事件触发后,获取新连接
4. 调用新连接获取成功后的回调函数
5.为新连接创建Connection进行管理(这一步不是Acceptor模块操作,应该是服务器模块)
因为Acceptor模块只进行监听连接的管理,因此获取到新连接的描述符之后,对于新连接描述符如何处理其实并不关心
对于新连接如何处理,应该是服务器模块来管理的,服务器模块,实现了一个对于新连接描述符处理的函数,将这个函数设置给Acceptor模块中的回调函数
源码
Acceptor.hpp
#pragma once
#include"Socket.hpp"
#include"EventLoop.hpp"
namespace ImMuduo
{
class Acceptor
{
using AcceptorCallback=std::function<void(int)>;
public:
Acceptor(EventLoop* eventLoop,int port);
~Acceptor()=default;
int CreateServer();
void setCallback(AcceptorCallback callback);
private:
// 获取新链接,调用回调函数
void HandleRead();
private:
AcceptorCallback callback_;//连接回调函数
Socket socket_;//创建监听套接字
EventLoop* eventLoop_; //监控监听套接字
std::unique_ptr<Channel> channel_; //监听套接字事件管理
};
}
Acceptor.cpp
#include "Acceptor.hpp"
#include"Log.hpp"
#include<cassert>
namespace ImMuduo
{
Acceptor::Acceptor(EventLoop* eventLoop, int port)
:eventLoop_(eventLoop)
{
socket_.CreateServer(port);
channel_ = std::make_unique<Channel>(socket_.fd(), nullptr);
channel_->EnableRead();
channel_->SetReadCallback([this]() { HandleRead(); });
eventLoop_->UpdateEvent(channel_.get());
}
int Acceptor::CreateServer()
{
return socket_.fd();
}
void Acceptor::setCallback(AcceptorCallback callback)
{
callback_ = callback;
}
void Acceptor::HandleRead()
{
int connfd=socket_.Accept();
if(callback_)
{
callback_(connfd);
}
}
}
本期内容到这里了,喜欢请点个赞谢谢
封面图自取:

openEuler 是由开放原子开源基金会孵化的全场景开源操作系统项目,面向数字基础设施四大核心场景(服务器、云计算、边缘计算、嵌入式),全面支持 ARM、x86、RISC-V、loongArch、PowerPC、SW-64 等多样性计算架构
更多推荐
所有评论(0)