【Linux网络编程】13. 多路转接 poll
本文介绍了IO模型中的IO多路转接之一poll,包括它的用法,以及poll服务器的实现等等内容
·
文章目录
一、I/O 多路转接之 poll
1、初识 poll
作用: 和 select 一样属于 IO 多路复用,用于同时监听多个文件描述符。
原理: 基于 struct pollfd 结构体数组管理 fd,没有最大文件描述符数量限制,解决了 select 位图大小受限的缺点。
改进点: 无需每次重新设置监听集合,接口更简洁,是 select 的优化版本。
1)poll 函数
2)pollfd 结构体
3)poll 执行过程
poll 核心是事件结构体数组,不再使用位图,没有最大 fd 数量限制。
以监听 3 个 fd(1、2、5)为例:
- 定义 pollfd 数组,设置要监听的 fd 和事件(POLLIN 可读)
- 初始化: 把 fd=1、2、5 加入数组,统一监听可读事件
- 调用 poll (): 阻塞等待事件就绪
- 内核检测到 fd=1、2 就绪,poll 返回
- 遍历数组: 通过 revents 字段判断哪些 fd 就绪
- 未就绪的 fd 保留在数组中,不会被清空
总结
- poll 用结构体数组管理 fd,无最大 4096 限制
- 事件不会自动清空,无需重复添加 fd
- 只需遍历数组判断 revents,使用更简单
2、socket 就绪条件
和select一样
-
读就绪
- 接收缓冲区数据 ≥ 低水位标记(SO_RCVLOWAT)
- 对端关闭连接(读返回 0)
- 监听 socket 有新连接
- socket 有未处理错误
-
写就绪
- 发送缓冲区空闲 ≥ 低水位标记(SO_SNDLOWAT)
- 写关闭,写操作触发 SIGPIPE
- 非阻塞 connect 完成
- socket 有未处理错误
-
异常就绪
- 收到 TCP 带外(紧急)数据
3、poll 优点
- 使用简单: 无需维护位图,用结构体数组统一管理
- 接口友好: 监听事件与就绪事件分开,不用重复重置
- 无数量上限: 不受固定大小限制,支持更多文件描述符
- 通用性强: 跨平台使用更方便,无需关注底层结构
4、poll 缺点
- 遍历效率低: 返回后仍需遍历所有 fd 找就绪事件
- 拷贝开销大: 每次调用都要拷贝大量 pollfd 到内核态
- 性能随连接下降: 监听 fd 越多,效率越低
- 核心问题未解决: 和 select 一样存在轮询浪费,仅优化了接口与数量限制
5、poll 监控标准输入
1)test.cc
#include<iostream>
#include<poll.h>
#include<unistd.h>
int main()
{
pollfd poll_fd;
poll_fd.fd = 0; // 监听标准输入
poll_fd.events = POLLIN; // 监听可读事件
while(true)
{
// poll:监听1个fd,超时1000毫秒
int ret = poll(&poll_fd, 1, 1000);
if(ret<0) // 调用出错
{
perror("poll");
continue;
}
if(ret==0) // 超时无事件
{
std::cout << "poll timeout" << std::endl;
continue;
}
// 判断是否是可读事件就绪
if(poll_fd.revents&POLLIN)
{
char buf[1024] = {0};
read(0, buf, sizeof(buf) - 1);
std::cout << "stdin: " << buf;
}
}
return 0;
}
6、poll 服务器
代码结构:
Mutex.hpp
Log.hpp
Common.hpp
InetAddr.hpp
Socket.hpp
PollServer.hpp
Main.cc
1)Mutex.hpp
#pragma once
#include <iostream>
#include <pthread.h>
namespace MutexModule
{
// 互斥锁封装类
class Mutex
{
public:
// 初始化锁
Mutex()
{
pthread_mutex_init(&_mutex, nullptr);
}
// 加锁
void Lock()
{
int n = pthread_mutex_lock(&_mutex);
}
// 解锁
void Unlock()
{
int n = pthread_mutex_unlock(&_mutex);
}
// 获取锁指针
pthread_mutex_t *Get()
{
return &_mutex;
}
// 销毁锁
~Mutex()
{
pthread_mutex_destroy(&_mutex);
}
private:
pthread_mutex_t _mutex;
};
// RAII 自动加解锁
class LockGuard
{
public:
LockGuard(Mutex &mutex) : _mutex(mutex)
{
_mutex.Lock(); // 构造时加锁
}
~LockGuard()
{
_mutex.Unlock(); // 析构时解锁
}
private:
Mutex &_mutex;
};
}
2)Log.hpp
#pragma once
#include <iostream>
#include <cstdio>
#include <string>
#include <filesystem>
#include <fstream>
#include <sstream>
#include <memory>
#include <ctime>
#include <sys/types.h>
#include <unistd.h>
#include "Mutex.hpp"
namespace LogModule
{
using namespace MutexModule;
const std::string gsep = "\r\n";
// 日志策略基类(接口)
class LogStrategy
{
public:
~LogStrategy() = default;
virtual void SyncLog(const std::string &message) = 0;
};
// 控制台日志输出(线程安全)
class ConsoleLogStrategy : public LogStrategy
{
public:
void SyncLog(const std::string &message) override
{
LockGuard lockguard(_mutex);
std::cout << message << gsep;
}
~ConsoleLogStrategy() {}
private:
Mutex _mutex;
};
const std::string defaultpath = "./log/"; // /var/log
const std::string defaultfile = "my.log";
// 文件日志输出(自动建目录、线程安全)
class FileLogStrategy : public LogStrategy
{
public:
FileLogStrategy(const std::string &path = defaultpath, const std::string &file = defaultfile)
: _path(path), _file(file)
{
LockGuard lockguard(_mutex);
if (std::filesystem::exists(_path))
return;
try
{
std::filesystem::create_directories(_path);
}
catch (const std::filesystem::filesystem_error &e)
{
std::cerr << e.what() << std::endl;
}
}
// 追加写入日志文件
void SyncLog(const std::string &message) override
{
LockGuard lockguard(_mutex);
std::string filename = _path + (_path.back() == '/' ? "" : "/") + _file;
std::ofstream out(filename, std::ios::app);
if (!out.is_open())
return;
out << message << gsep;
out.close();
}
private:
std::string _path;
std::string _file;
Mutex _mutex;
};
// 日志等级类
enum class LogLevel
{
DEBUG,
INFO,
WARNING,
ERROR,
FATAL
};
// 日志等级转字符串
std::string LevelStr(LogLevel level)
{
switch (level)
{
case LogLevel::DEBUG:
return "DEBUG";
case LogLevel::INFO:
return "INFO";
case LogLevel::WARNING:
return "WARNING";
case LogLevel::ERROR:
return "ERROR";
case LogLevel::FATAL:
return "FATAL";
default:
return "UNKNOWN";
}
}
// 获取格式化时间字符串(线程安全)
std::string GetTimeStamp()
{
time_t curr = time(nullptr);
struct tm curr_tm; // 出参
localtime_r(&curr, &curr_tm);
char buf[128]; // 出参
snprintf(buf, sizeof(buf), "%4d-%02d-%02d %02d:%02d:%02d",
curr_tm.tm_year + 1900,
curr_tm.tm_mon + 1,
curr_tm.tm_mday,
curr_tm.tm_hour,
curr_tm.tm_min,
curr_tm.tm_sec);
return buf;
}
// 日志核心管理类
class Logger
{
public:
Logger()
{
EnableConsoleLogStrategy();
}
// 切换为文件输出
void EnableFileLogStrategy()
{
_fflush_strategy = std::make_unique<FileLogStrategy>();
}
// 切换为控制台输出
void EnableConsoleLogStrategy()
{
_fflush_strategy = std::make_unique<ConsoleLogStrategy>();
}
// 日志消息构造: 负责拼接内容, 析构时自动输出
class LogMessage
{
public:
// 构造日志头部(时间、等级、进程ID、文件名、行号)
LogMessage(LogLevel level, std::string src_name, int line_number, Logger &logger)
: _logger(logger)
{
std::stringstream ss;
ss << "[" << GetTimeStamp() << "] "
<< "[" << LevelStr(level) << "] "
<< "[" << getpid() << "] "
<< "[" << src_name << "] "
<< "[" << line_number << "] - ";
_loginfo = ss.str();
}
// 流方式拼接日志内容
template <typename T>
LogMessage &operator<<(const T &info)
{
std::stringstream ss;
ss << info;
_loginfo += ss.str();
return *this;
}
// 析构自动输出日志
~LogMessage()
{
if (_logger._fflush_strategy)
{
_logger._fflush_strategy->SyncLog(_loginfo);
}
}
private:
std::string _loginfo;
Logger &_logger;
};
// 仿函数接口, 创建日志消息
LogMessage operator()(LogLevel level, std::string name, int line)
{
return LogMessage(level, name, line, *this);
}
private:
std::unique_ptr<LogStrategy> _fflush_strategy;
};
Logger logger;
// 简化调用宏
#define LOG(level) logger(level, __FILE__, __LINE__)
#define Enable_Console_Log_Strategy() logger.EnableConsoleLogStrategy()
#define Enable_File_Log_Strategy() logger.EnableFileLogStrategy()
}
3)Common.hpp
#pragma once
#include <iostream>
#include <string>
#include <cstring>
#include <functional>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <sys/wait.h>
// 程序退出状态码枚举
enum ExitCode
{
OK = 0, // 正常
USAGE_ERR, // 参数错误
SOCKET_ERR, // Socket 创建失败
BIND_ERR, // 绑定失败
LISTEN_ERR, // 监听失败
CONNECT_ERR, // 连接失败
FORK_ERR, // 进程创建失败
OPEN_ERR, // 打开文件失败
};
// 禁止拷贝基类(继承后无法拷贝/赋值)
class NoCopy
{
public:
NoCopy() {}
~NoCopy() {}
NoCopy(const NoCopy &) = delete;
NoCopy &operator=(const NoCopy &) = delete;
};
// 地址类型转换宏: sockaddr_in → sockaddr*
#define CONV(addr) ((struct sockaddr *)&addr)
4)InetAddr.hpp
#pragma once
#include "Common.hpp"
// IPv4地址封装: 主机格式 <-> 网络格式
class InetAddr
{
public:
// 默认构造
InetAddr() {}
// 构造: 网络地址 -> 主机格式
InetAddr(struct sockaddr_in &addr)
: _addr(addr)
{
_port = ntohs(_addr.sin_port); // port
char ipbuffer[64]; // ip
inet_ntop(AF_INET, &_addr.sin_addr, ipbuffer, sizeof(ipbuffer));
_ip = ipbuffer;
}
// 构造:指定IP+端口 → 网络格式
InetAddr(const std::string &ip, uint16_t port)
: _ip(ip), _port(port)
{
memset(&_addr, 0, sizeof(_addr));
_addr.sin_family = AF_INET;
inet_pton(AF_INET, _ip.c_str(), &_addr.sin_addr); // ip
_addr.sin_port = htons(_port); // port
}
// 构造:仅端口, 绑定本机所有IP
InetAddr(uint16_t port)
: _port(port), _ip()
{
memset(&_addr, 0, sizeof(_addr));
_addr.sin_family = AF_INET;
_addr.sin_addr.s_addr = INADDR_ANY; // ip
_addr.sin_port = htons(_port); // port
}
// 设置网络地址信息
void SetAddr(struct sockaddr_in &addr)
{
_addr = addr;
_port = ntohs(_addr.sin_port);
char ipbuffer[64];
inet_ntop(AF_INET, &_addr.sin_addr, ipbuffer, sizeof(ipbuffer));
_ip = ipbuffer;
}
// 获取点分十进制IP
std::string Ip() { return _ip; }
// 获取主机字节序端口
uint16_t Port() { return _port; }
// 获取原生网络地址结构体
const struct sockaddr_in &NetAddr() { return _addr; }
// 获取通用地址指针(用于系统调用)
const struct sockaddr *NetAddrPtr() { return CONV(_addr); }
// 获取地址长度
socklen_t NetAddrLen() { return sizeof(_addr); }
// 比较两个地址是否相同
bool operator==(const InetAddr &addr)
{
return addr._ip == _ip && addr._port == _port;
}
// 转为 ip:port 格式字符串
std::string StringAddr()
{
return _ip + ":" + std::to_string(_port);
}
~InetAddr() {}
private:
struct sockaddr_in _addr; // 网络字节序地址
std::string _ip; // 点分十进制IP
uint16_t _port; // 主机字节序端口
};
5)Socket.hpp
#pragma once
#include <iostream>
#include <string>
#include <unistd.h>
#include <cstdlib>
#include <sys/socket.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "Log.hpp"
#include "Common.hpp"
#include "InetAddr.hpp"
namespace SocketModule
{
using namespace LogModule;
const static int gbacklog = 16; // 监听队列长度
// 抽象Socket接口(模板方法)
class Socket
{
public:
virtual ~Socket() {}
virtual void SocketOrDie() = 0; // 创建socket
virtual void BindOrDie(uint16_t port) = 0; // 绑定端口
virtual void ListenOrDie(int backlog) = 0; // 监听
virtual std::shared_ptr<Socket> Accept(InetAddr *client) = 0; // 接收连接
virtual void Close() = 0; // 关闭socket
virtual int Recv(std::string *out) = 0; // 接收数据
virtual int Send(const std::string &message) = 0; // 发送数据
virtual int Connect(const std::string &server_ip, uint16_t port) = 0; // 连接服务端
virtual int Fd() = 0; // 获取文件描述符
public:
// 构建TCP服务端
void BuildTcpSocketMethod(uint16_t port, int backlog = gbacklog)
{
SocketOrDie();
BindOrDie(port);
ListenOrDie(backlog);
}
// 构建TCP客户端
void BuildTcpClientSocketMethod()
{
SocketOrDie();
}
};
const static int defaultfd = -1;
// TCP套接字实现
class TcpSocket : public Socket
{
public:
TcpSocket() : _sockfd(defaultfd) {}
TcpSocket(int fd) : _sockfd(fd) {}
~TcpSocket() {}
// 创建socket
void SocketOrDie() override
{
_sockfd = ::socket(AF_INET, SOCK_STREAM, 0);
if (_sockfd < 0)
{
LOG(LogLevel::FATAL) << "socket error";
exit(SOCKET_ERR);
}
LOG(LogLevel::INFO) << "socket success";
}
// 绑定端口
void BindOrDie(uint16_t port) override
{
InetAddr localaddr(port);
int n = ::bind(_sockfd, localaddr.NetAddrPtr(), localaddr.NetAddrLen());
if (n < 0)
{
LOG(LogLevel::FATAL) << "bind error";
exit(BIND_ERR);
}
LOG(LogLevel::INFO) << "bind success";
}
// 监听
void ListenOrDie(int backlog) override
{
int n = ::listen(_sockfd, backlog);
if (n < 0)
{
LOG(LogLevel::FATAL) << "listen error";
exit(LISTEN_ERR);
}
LOG(LogLevel::INFO) << "listen success";
}
// 接收客户端连接
std::shared_ptr<Socket> Accept(InetAddr *client) override
{
struct sockaddr_in peer;
socklen_t len = sizeof(peer);
// 阻塞等待客户端连接
int fd = ::accept(_sockfd, CONV(peer), &len);
if (fd < 0)
{
LOG(LogLevel::WARNING) << "accept warning ...";
return nullptr;
}
// 保存客户端地址
client->SetAddr(peer);
// 返回新连接的socket对象
return std::make_shared<TcpSocket>(fd);
}
// 接收数据 出参out
int Recv(std::string *out) override
{
char buffer[1024];
ssize_t n = ::recv(_sockfd, buffer, sizeof(buffer) - 1, 0);
if (n > 0)
{
buffer[n] = 0;
*out += buffer;
}
return n;
}
// 发送数据
int Send(const std::string &message) override
{
return send(_sockfd, message.c_str(), message.size(), 0);
}
// 连接服务端
int Connect(const std::string &server_ip, uint16_t port) override
{
InetAddr server(server_ip, port);
return ::connect(_sockfd, server.NetAddrPtr(), server.NetAddrLen());
}
// 关闭套接字
void Close() override
{
if (_sockfd >= 0)
::close(_sockfd);
}
// 获取文件描述符
int Fd() override
{
return _sockfd;
}
private:
int _sockfd; // socket文件描述符
};
}
6)PollServer.hpp
#pragma once
#include <iostream>
#include <memory>
#include <unistd.h>
#include <sys/poll.h>
#include "Log.hpp"
#include "Socket.hpp"
using namespace LogModule;
using namespace SocketModule;
// Poll 多路复用服务器
class PollServer
{
const static int size = 4096; // 最大支持连接数
const static int defaultfd = -1;
public:
// 构造:初始化监听套接字
PollServer(int port)
: _listensock(std::make_unique<TcpSocket>()),
_isrunning(false)
{
_listensock->BuildTcpSocketMethod(port);
// 初始化pollfd数组
for (int i = 0; i < size; i++)
{
_fds[i].fd = defaultfd;
_fds[i].events = 0;
_fds[i].revents = 0;
}
// 将监听fd加入数组,关注读事件
_fds[0].fd = _listensock->Fd();
_fds[0].events = POLLIN;
}
// 打印当前监控的 fd
void PrintFd()
{
std::cout << "_fds[]: ";
for (int i = 0; i < size; i++)
{
if (_fds[i].fd == defaultfd)
continue;
std::cout << _fds[i].fd << " ";
}
std::cout << "\r\n";
}
// 接受新连接
void Accepter()
{
InetAddr client;
std::shared_ptr<Socket> new_socket = _listensock->Accept(&client);
if (new_socket == nullptr)
{
LOG(LogLevel::WARNING) << "accept failed";
return;
}
int sockfd = new_socket->Fd();
LOG(LogLevel::INFO) << "get a new link, sockfd: "
<< sockfd << ", client is: " << client.StringAddr();
// 找空位存入新 fd
int pos = 0;
for (; pos < size; pos++)
{
if (_fds[pos].fd == defaultfd)
break;
}
if (pos == size)
{
LOG(LogLevel::WARNING) << "poll server full";
close(sockfd);
}
else
{
// 将新连接加入 poll 监控
_fds[pos].fd = sockfd;
_fds[pos].events = POLLIN;
_fds[pos].revents = 0;
}
}
// 读取客户端数据
void Recver(int pos)
{
char buffer[1024];
ssize_t n = recv(_fds[pos].fd, buffer, sizeof(buffer) - 1, 0);
if (n > 0)
{
buffer[n] = 0;
std::cout << "client say@ " << buffer << std::endl;
}
else if (n == 0)
{
// 客户端断开,清理 fd
LOG(LogLevel::INFO) << "clien quit...";
close(_fds[pos].fd);
_fds[pos].fd = defaultfd;
_fds[pos].events = 0;
_fds[pos].revents = 0;
}
else
{
// 客户端出错,清理 fd
LOG(LogLevel::ERROR) << "recv error";
close(_fds[pos].fd);
_fds[pos].fd = defaultfd;
_fds[pos].events = 0;
_fds[pos].revents = 0;
}
}
// 事件派发:区分监听fd / 通信fd
void Dispatcher()
{
for (int i = 0; i < size; i++)
{
if (_fds[i].fd == defaultfd)
continue;
// 检测是否是读事件就绪
if (_fds[i].revents & POLLIN)
{
if (_fds[i].fd == _listensock->Fd())
Accepter(); // 监听fd: 新连接
else
Recver(i); // 通信fd:读数据
}
}
}
// 启动服务器
void Start()
{
int timeout = -1; // -1 表示永久阻塞
_isrunning = true;
while (_isrunning)
{
PrintFd();
// 调用 poll 等待事件
int n = poll(_fds, size, timeout);
switch (n)
{
case -1:
LOG(LogLevel::ERROR) << "poll error";
break;
case 0:
LOG(LogLevel::INFO) << "poll time out...";
break;
default:
LOG(LogLevel::DEBUG) << "有事件就绪了..., n : " << n;
Dispatcher(); // 处理就绪事件
break;
}
}
_isrunning = false;
}
// 停止服务
void Stop()
{
_isrunning = false;
}
~PollServer() {}
private:
std::unique_ptr<Socket> _listensock; // 监听套接字
bool _isrunning; // 运行状态
struct pollfd _fds[size]; // poll 监控数组
};
7)Main.cc
#include "Common.hpp"
#include "PollServer.hpp"
// ./pollserver port
int main(int argc, char *argv[])
{
if (argc != 2)
{
std::cout << "Usage: " << argv[0] << " port" << std::endl;
exit(USAGE_ERR);
}
// Enable_Console_Log_Strategy();
uint16_t port = std::stoi(argv[1]);
// 创建并启动服务器`
std::unique_ptr<PollServer> svr = std::make_unique<PollServer>(port);
svr->Start();
return 0;
}
客户端a
客户端b
客户端c
服务端
openEuler 是由开放原子开源基金会孵化的全场景开源操作系统项目,面向数字基础设施四大核心场景(服务器、云计算、边缘计算、嵌入式),全面支持 ARM、x86、RISC-V、loongArch、PowerPC、SW-64 等多样性计算架构
更多推荐

所有评论(0)