TCP并发服务器多路复用代码实现
引言
在写代码之前,我们必须要介绍一下多路复用。多路复用简单来说就是单线程完成多线程的任务。
因为在现实中,很多的客户端都会连接服务器,但是并不是每一个客户端都是每时每刻与服务器进行交流的。对于此,我们就有了多路复用。只要你客户端要交流,那么就进入我们的阻塞函数,这个阻塞函数更像是一个队列,进入阻塞函数的文件描述符就会被相应的函数调用,如果你没有任何事件的产生就一直不会进入阻塞函数,也就不会被调用。这样子单线程就解决了多线程的问题。
但是正是因为这是一个单线程,我们就不可以使用阻塞函数,因为你把路给堵死了,那剩下的事件不是就动不了了,一定要使用非阻塞函数,或者在一定条件下使用阻塞函数(比如select函数)

代码
作为服务器端的代码,我们这里选择的是select函数来完成多路复用。select函数关键就是他把文件描述符放到了一个集合里面,这个集合就像是一个数组,我们可以对这个集合进行插入,删除,清空等操作。所以我们这里是定义了两个集合,一个是tempfds,每一次调用都是调用这一个集合,而readfds这个集合的目的是一个模板,每一次删除或者增加都是在这个集合上面进行的,最后把这个集合拷贝给tempfds集合。
这个其中的主要原因是select他会改变集合里面的数据,他的底层是位图,凡是没有事件的文件描述符都会从这个集合里面被赶出去,如果只有一个readfds集合,那么这些被赶出去的数据就再也回不来了。而如果有了tempfds数据,我们只需要每次while()循环的最后吧readfds拷贝一份给tempfds即可。
最后我们只需要用这个FD_ISSET(sfd, &tempfds)来判断这个文件描述符是不是在这个集合里面。
除了以上的操作,我们还要注意一个细节就是因为select的底层是位图,那么它的操作就是遍历整个位图,所以我们才需要输入最大的文件描述符+1,因为位图的第一个数是从0开始的。那如果我们最后一个文件描述符对应的客户端退出了,也就是recv(i, rbuf, sizeof(rbuf), 0) = 0,现关闭并删除文件描述符,然后我们就要看是不是最大的文件描述符退出了,如果是的就要更新最大的文件描述符的值,如果没有就不管。那如果是新加的文件描述符呢,就看是不是最大的。(我们查看客户端是否退出的操作是最后那个for()循环,我们调用recv()并不是为了接受数据,因为我们没有想打印出数据,而是为了确认返回值,通过返回值知道这个客户端是否退出)
最后解释一下,sfd这个是与客户端连接的文件描述符,3号,是一定要存在的,所以之后文件描述符都是从4开始
#include <iostream>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <string.h>
#define SER_PORT 8888
#define SER_IP "192.168.189.134"
int main() {
int sfd = socket(AF_INET, SOCK_STREAM, 0);
if (sfd == -1) {
perror("socket error");
return -1;
}
sockaddr_in sin;
sin.sin_addr.s_addr = inet_addr(SER_IP);
sin.sin_family = AF_INET;
sin.sin_port = htons(SER_PORT);
sockaddr_in cin;
socklen_t cinlen = sizeof(cin);
fd_set readfds, tempfds;
FD_ZERO(&readfds);
FD_SET(sfd, &readfds);
FD_SET(0, &readfds);
if (bind(sfd, (struct sockaddr *)& sin, sizeof(sin)) == -1) {
perror("bind error");
return -1;
}
if (listen(sfd, 128) == -1) {
perror("listen error");
return -1;
}
int maxfd = sfd; // 定义一个变量,用于存储最大的文件描述符
int newfd = -1;
// 定义一个地址信息结构体,用来存储客户端的地址信息
sockaddr_in cin_arr[1024]; // 大小是1024是因为集合的大小是1024
while(1) {
tempfds = readfds;
// 之所以加1是因为select底层是位图,而遍历的开始是从0位开始的,一直到maxfds位,所以要加1
int res = select(maxfd + 1, &tempfds, nullptr, nullptr, nullptr);
if (res == -1) {
perror("select error");
return -1;
} else if (res == 0) {
std::cout << "select timeout" << std::endl;
}
// sfd触发了事件,说明有客户端来请求连接
if (FD_ISSET(sfd, &tempfds)) {
newfd = accept(sfd, (struct sockaddr *)& cin, &cinlen);
if (newfd == -1) {
perror("accept error");
return -1;
}
// 将客户端对应的地址信息放到数组对应的位置
cin_arr[newfd] = cin;
FD_SET(newfd, &readfds);
// 更新maxfd
if (maxfd < newfd) {
maxfd = newfd;
}
}
if (FD_ISSET(0, &tempfds)) {
char wbuf[128] = "";
bzero(wbuf, sizeof(wbuf));
fgets(wbuf, sizeof(wbuf), stdin);
wbuf[strlen(wbuf) - 1] = 0;
// 将输入的数据发送给所有的客户端
for (int i = 4; i <= maxfd; i++) {
send(i, wbuf, strlen(wbuf), 0);
}
}
for(int i = 4; i <= maxfd; i++) {
if(FD_ISSET(i, &tempfds)) {
char rbuf[128] = "";
bzero(rbuf, sizeof(rbuf));
int res = recv(i, rbuf, sizeof(rbuf), 0);
if(res == 0) {
std::cout << "client close" << std::endl;
close(i);
FD_CLR(i, &readfds);
for(int j = maxfd; j >= 4; j--) { // 只保证最高值退出才更新,中间值退出不需要更新
if(FD_ISSET(j, &readfds)) {
maxfd = j;
break;
}
}
continue; // 结束本次的阻塞,继续下一个客户端
}
}
}
}
close(sfd);
return 0;
}
作为客户端,这一次我们选择用poll来完成多路复用,主要的操作区别就是poll这里使用的几个结构体,结构体里面event表示的是我们可能会做的事情,POLLIN表示读操作,POLLOUT表示写操作,而revent表示实际上做的事情,所以我们初始化应该初始化event,做事件判断的时候,应该看revent,到底是什么事件来的
#include <iostream>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <string.h>
#include <poll.h> // 包含poll头文件
#define SER_PORT 8888
#define SER_IP "192.168.189.134"
#define CLI_PORT 9999
#define CLI_IP "192.168.189.134"
int main(int argc, char const *argv[])
{
// 1、创建用于通信的客户端套接字文件描述符
int cfd = socket(AF_INET, SOCK_STREAM, 0);
if (cfd == -1)
{
perror("socket error");
return -1;
}
printf("socket success cfd = %d\n", cfd); // 3
// 2、绑定ip地址和端口号(可选)
// 2.1填充要绑定的地址信息结构体
struct sockaddr_in cin;
cin.sin_family = AF_INET;
cin.sin_port = htons(SER_PORT);
cin.sin_addr.s_addr = inet_addr(SER_IP);
// 2.2绑定工作
if (bind(cfd, (struct sockaddr *)&cin, sizeof(cin)) == -1)
{
perror("bind error");
return -1;
}
printf("bind success\n");
// 3、连接服务器
// 3.1填充要连接的服务器地址信息结构体
struct sockaddr_in sin;
sin.sin_family = AF_INET;
sin.sin_port = htons(SER_PORT);
sin.sin_addr.s_addr = inet_addr(SER_IP);
// 3.2连接工作
if (connect(cfd, (struct sockaddr *)&sin, sizeof(sin)) == -1)
{
perror("connect error");
return -1;
}
printf("连接服务器成功\n");
// 使用poll来完成终端写入数据和套接字接收数据的并发
struct pollfd pfds[2]; // pfd[0] pfd[1]
// 分别给两个文件描述符成员赋值
pfds[0].fd = 0; // 检测的是0号
pfds[0].events = POLLIN; // 表示的是读事件
pfds[1].fd = cfd; // 检测的是cfd文件描述符
pfds[1].events = POLLIN; // 检测的是读事件
// 4、数据收发
char wbuf[128] = "";
while (1)
{
int res = poll(pfds, 2, -1);
// 功能:阻塞等待文件描述符集合中是否有事件产生
// 参数1:文件描述符集合的起始地址
// 参数2:文件描述符个数
// 参数3:表示永久等待
if (res == -1)
{
perror("poll error");
return -1;
}
// 程序执行至此,表示文件描述符容器中,有事件产生
// 表示0号文件描述符的事件
if (pfds[0].revents == POLLIN)
{
// 清空容器
bzero(wbuf, sizeof(wbuf));
// 从终端获取数据
fgets(wbuf, sizeof(wbuf), stdin); // 0
wbuf[strlen(wbuf) - 1] = 0;
// 将数据发送给服务器
if (send(cfd, wbuf, sizeof(wbuf), 0) == -1)
{
perror("send error");
return -1;
}
}
//表示服务器端发来消息
if (pfds[1].revents == POLLIN)
{
// 接受服务器发送过来的消息
if (recv(cfd, wbuf, sizeof(wbuf), 0) == 0)
{ // cfd
printf("对端已经下线\n");
break;
}
printf("收到服务器消息为:%s\n", wbuf);
}
}
// 5、关闭套接字
close(cfd);
return 0;
}
总结
可能大家会有疑问,不是说不能用阻塞函数吗,怎么这里还是用recv,因为我们这里recv()的调用条件是已经有数据发过来了,recv一调用,就立马读取数据,然后返回,并不会阻塞在那里,继续等待数据的到来。
本篇文章就到这里结束了!!!!希望可以帮助大家更好的理解多路复用~~~
openEuler 是由开放原子开源基金会孵化的全场景开源操作系统项目,面向数字基础设施四大核心场景(服务器、云计算、边缘计算、嵌入式),全面支持 ARM、x86、RISC-V、loongArch、PowerPC、SW-64 等多样性计算架构
更多推荐
所有评论(0)