UNIX Network Programming The Sockets Networking API学习:第23章 -第28章
SCTP(Stream Control Transmission Protocol,流控制传输协议)是一种面向消息的协议。与TCP的字节流不同,SCTP保留了应用层消息的边界。消息传递的两种方式:23.2 自动关闭(Autoclose)问题背景在一对多风格()的SCTP服务器中,服务器不主动关闭连接,依赖客户端关闭。若客户端建立连接后什么也不发,服务器资源就被白白占用,形成变相的拒绝服务攻击。设置
第23章:SCTP 高级套接字
23.1 概述
SCTP(Stream Control Transmission Protocol,流控制传输协议)是一种面向消息的协议。
与TCP的字节流不同,SCTP保留了应用层消息的边界。
消息传递的两种方式:
整条消息一次收完(常见情况)
┌─────────────────────────────┐
│ 完整消息 [Hello World] │
└─────────────────────────────┘
一次 recvmsg 拿走
消息太大时,分多次交付(部分交付)
┌────────┐ ┌────────┐ ┌────────┐
│ 片段1 │ │ 片段2 │ │ 片段3 │
└────────┘ └────────┘ └────────┘
第1次recv 第2次recv 第3次recv(MSG_EOR置1,表示消息结束)
SCTP 绝不会把两条消息的片段混在一起,这是与TCP最重要的区别之一。
23.2 自动关闭(Autoclose)
问题背景
在一对多风格(SOCK_SEQPACKET)的SCTP服务器中,服务器不主动关闭连接,依赖客户端关闭。
若客户端建立连接后什么也不发,服务器资源就被白白占用,形成变相的拒绝服务攻击。
解决方案:SCTP_AUTOCLOSE
设置一个最大空闲时间,超过这个时间,SCTP栈自动关闭该关联(association)。
核心代码及注释:
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/sctp.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
// 编译: g++ -o server server.cpp -lsctp
int main(int argc, char* argv[]) {
int sock_fd;
struct sockaddr_in servaddr;
int close_time;
// 创建一对多风格的 SCTP 套接字
// AF_INET: IPv4
// SOCK_SEQPACKET: 一对多风格
// IPPROTO_SCTP: 使用SCTP协议
sock_fd = socket(AF_INET, SOCK_SEQPACKET, IPPROTO_SCTP);
if (sock_fd < 0) { perror("socket"); return 1; }
// 设置自动关闭时间为 120 秒
// 如果某个关联在 120 秒内没有任何用户数据传输(双向均无),
// SCTP 栈会自动将其关闭,释放资源
close_time = 120;
if (setsockopt(sock_fd, IPPROTO_SCTP, SCTP_AUTOCLOSE,
&close_time, sizeof(close_time)) < 0) {
perror("setsockopt SCTP_AUTOCLOSE");
return 1;
}
// 绑定地址
memset(&servaddr, 0, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
servaddr.sin_port = htons(9877); // SERV_PORT
bind(sock_fd, (struct sockaddr*)&servaddr, sizeof(servaddr));
listen(sock_fd, 5);
// ... 后续收发逻辑不变
// 超过 120 秒空闲的关联会被自动清除,无需任何额外代码
return 0;
}
注意事项:
autoclose 时间不能设得太小,否则服务器可能在想要回复客户端时,发现关联已被关闭,
还需要重新建立关联,带来额外开销,且客户端通常不会主动监听新连接。
23.3 部分交付(Partial Delivery)
触发条件
当一条消息"很大"时(KAME实现中,默认阈值是接收缓冲区的一半,即 655366553665536 字节),
SCTP栈没有足够空间一次性缓存整条消息,就会启动部分交付API,分批将消息交给应用层。
三条约束:
- 消息消耗的缓冲区必须达到或超过阈值
- 只能从消息起始处按顺序交付,遇到缺失片段就停止
- 部分交付期间,其他所有消息(包括其他流)都被阻塞,直到当前大消息完全交付
判断是否消息结束
msg_flags 中的 MSG_EOR 标志:
MSG_EOR = 1 → 当前消息已完整接收(End Of Record)
MSG_EOR = 0 → 消息还没接收完,需要继续读
封装函数实现及详细注释
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/sctp.h>
#include <stdint.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
// 每次扩容时增加的字节数(示例值)
#define SCTP_PDAPI_INCR_SZ (65536)
// 当剩余空间小于此值时,触发扩容
#define SCTP_PDAPI_NEED_MORE_THRESHOLD (8192)
// 静态缓冲区:跨调用复用,避免反复 malloc
static uint8_t* sctp_pdapi_readbuf = NULL;
static int sctp_pdapi_rdbuf_sz = 0;
/**
* pdapi_recvmsg - 封装了部分交付API的接收函数
*
* @sock_fd : SCTP套接字描述符
* @rdlen : [输出] 实际读到的总字节数
* @from : [输出] 对端地址
* @from_len : [输入/输出] 地址结构长度
* @sri : [输出] SCTP发送/接收信息(流号、SSN等)
* @msg_flags : [输出] 消息标志(含MSG_EOR)
*
* 返回值:指向静态缓冲区的指针(包含完整消息),失败返回NULL
*/
uint8_t* pdapi_recvmsg(int sock_fd, int* rdlen,
struct sockaddr* from, int* from_len,
struct sctp_sndrcvinfo* sri, int* msg_flags)
{
int rdsz, left, at_in_buf;
int frmlen = 0;
// ── 第一步:确保静态缓冲区已分配 ──────────────────────────────
if (sctp_pdapi_readbuf == NULL) {
sctp_pdapi_readbuf = (uint8_t*)malloc(SCTP_PDAPI_INCR_SZ);
if (!sctp_pdapi_readbuf) { *rdlen = -1; return NULL; }
sctp_pdapi_rdbuf_sz = SCTP_PDAPI_INCR_SZ;
}
// ── 第二步:读取第一块数据 ────────────────────────────────────
// sctp_recvmsg 类似于 recvmsg,但专为SCTP设计
// at_in_buf 记录当前缓冲区中已有的字节数
at_in_buf = sctp_recvmsg(sock_fd,
sctp_pdapi_readbuf,
sctp_pdapi_rdbuf_sz,
from, (socklen_t*)from_len,
sri, msg_flags);
// 读取出错或对端关闭连接(返回0)
if (at_in_buf < 1) {
*rdlen = at_in_buf;
return NULL;
}
// ── 第三步:循环读取,直到 MSG_EOR 被置位 ───────────────────
// MSG_EOR(End Of Record)置1表示消息已完整接收
while ((*msg_flags & MSG_EOR) == 0) {
// 计算缓冲区剩余空间
left = sctp_pdapi_rdbuf_sz - at_in_buf;
// 若剩余空间不足,则扩容
if (left < SCTP_PDAPI_NEED_MORE_THRESHOLD) {
// realloc 保留原有数据,并在末尾追加新空间
sctp_pdapi_readbuf = (uint8_t*)realloc(
sctp_pdapi_readbuf,
sctp_pdapi_rdbuf_sz + SCTP_PDAPI_INCR_SZ);
if (sctp_pdapi_readbuf == NULL) {
// 内存不足,程序退出(生产代码应更优雅地处理)
fprintf(stderr, "sctp_pdapi ran out of memory\n");
exit(1);
}
sctp_pdapi_rdbuf_sz += SCTP_PDAPI_INCR_SZ;
left = sctp_pdapi_rdbuf_sz - at_in_buf;
}
// 从缓冲区当前位置(at_in_buf)继续接收数据
// 注意:from 和 sri 传 NULL,因为续读时不需要重复获取地址和控制信息
rdsz = sctp_recvmsg(sock_fd,
&sctp_pdapi_readbuf[at_in_buf],
left,
NULL, (socklen_t*)&frmlen,
NULL, msg_flags);
// 累计已读字节数,继续检查 MSG_EOR
at_in_buf += rdsz;
}
// ── 第四步:消息完整,返回结果 ──────────────────────────────
*rdlen = at_in_buf;
return sctp_pdapi_readbuf; // 指向完整消息的缓冲区
}
int main() {
// 示例:如何使用 pdapi_recvmsg
// (完整服务器代码需要先创建套接字、绑定、监听)
printf("pdapi_recvmsg 封装函数定义完成,可在服务器循环中调用。\n");
return 0;
}
23.4 通知(Notifications)
SCTP可以把传输层发生的事件以通知消息的形式上报给应用层,共7种事件。
7种通知事件
| 事件常量 | 含义 |
|---|---|
SCTP_ASSOC_CHANGE |
关联状态变化(建立/断开/重启) |
SCTP_PEER_ADDR_CHANGE |
对端地址状态变化(可用/不可达/增删) |
SCTP_REMOTE_ERROR |
收到对端的错误块 |
SCTP_SEND_FAILED |
消息发送失败(关联断开或部分可靠超时) |
SCTP_ADAPTION_INDICATION |
收到适配层指示(握手时协商的32位值) |
SCTP_PARTIAL_DELIVERY_EVENT |
部分交付被中止 |
SCTP_SHUTDOWN_EVENT |
对端发起了优雅关闭 |
启用所有通知
#include <netinet/sctp.h>
// 填充事件订阅结构体,每个字段设为1表示订阅该事件
struct sctp_event_subscribe evnts;
memset(&evnts, 0, sizeof(evnts));
evnts.sctp_data_io_event = 1; // 数据IO事件(必须开,否则收不到数据通知)
evnts.sctp_association_event = 1; // 关联变化
evnts.sctp_address_event = 1; // 地址变化
evnts.sctp_send_failure_event = 1; // 发送失败
evnts.sctp_peer_error_event = 1; // 对端错误
evnts.sctp_shutdown_event = 1; // 关闭事件
evnts.sctp_partial_delivery_event = 1; // 部分交付事件
evnts.sctp_adaption_layer_event = 1; // 适配层事件
setsockopt(sock_fd, IPPROTO_SCTP, SCTP_EVENTS, &evnts, sizeof(evnts));
如何区分通知与普通数据
收到消息后检查 msg_flags:
if (msg_flags & MSG_NOTIFICATION) {
// 这是一条通知,不是用户数据
print_notification(readbuf);
} else {
// 这是普通用户数据
process_data(readbuf, rd_sz);
}
通知解析函数及注释
#include <stdio.h>
#include <netinet/sctp.h>
#include <arpa/inet.h>
/**
* print_notification - 解析并打印SCTP通知
* @notify_buf: 从 sctp_recvmsg 读到的缓冲区(MSG_NOTIFICATION已置位)
*/
void print_notification(char* notify_buf)
{
// 将缓冲区强制转型为通知联合体
// union sctp_notification 覆盖了所有可能的通知类型
union sctp_notification* snp =
(union sctp_notification*)notify_buf;
const char* str;
// 根据通知类型分发处理
switch (snp->sn_header.sn_type) {
// ── 关联状态变化 ──────────────────────────────────────────────
case SCTP_ASSOC_CHANGE: {
struct sctp_assoc_change* sac = &snp->sn_assoc_change;
switch (sac->sac_state) {
case SCTP_COMM_UP: str = "COMMUNICATION UP"; break;
case SCTP_COMM_LOST: str = "COMMUNICATION LOST"; break;
case SCTP_RESTART: str = "RESTART"; break;
case SCTP_SHUTDOWN_COMP: str = "SHUTDOWN COMPLETE"; break;
case SCTP_CANT_STR_ASSOC:str = "CAN'T START ASSOC"; break;
default: str = "UNKNOWN"; break;
}
printf("SCTP_ASSOC_CHANGE: %s, assoc=0x%x\n",
str, (uint32_t)sac->sac_assoc_id);
break;
}
// ── 对端地址变化 ──────────────────────────────────────────────
case SCTP_PEER_ADDR_CHANGE: {
struct sctp_paddr_change* spc = &snp->sn_paddr_change;
switch (spc->spc_state) {
case SCTP_ADDR_AVAILABLE: str = "ADDRESS AVAILABLE"; break;
case SCTP_ADDR_UNREACHABLE:str = "ADDRESS UNREACHABLE"; break;
case SCTP_ADDR_REMOVED: str = "ADDRESS REMOVED"; break;
case SCTP_ADDR_ADDED: str = "ADDRESS ADDED"; break;
case SCTP_ADDR_MADE_PRIM: str = "ADDRESS MADE PRIMARY"; break;
default: str = "UNKNOWN"; break;
}
// spc_aaddr 是发生变化的那个对端地址
char addrstr[INET6_ADDRSTRLEN];
struct sockaddr_in* sin =
(struct sockaddr_in*)&spc->spc_aaddr;
inet_ntop(AF_INET, &sin->sin_addr, addrstr, sizeof(addrstr));
printf("SCTP_PEER_ADDR_CHANGE: %s, addr=%s, assoc=0x%x\n",
str, addrstr, (uint32_t)spc->spc_assoc_id);
break;
}
// ── 对端错误 ──────────────────────────────────────────────────
case SCTP_REMOTE_ERROR: {
struct sctp_remote_error* sre = &snp->sn_remote_error;
// sre_data 中含有具体错误块内容,这里只打印关联ID和错误码
printf("SCTP_REMOTE_ERROR: assoc=0x%x error=%d\n",
(uint32_t)sre->sre_assoc_id, sre->sre_error);
break;
}
// ── 发送失败(消息未送达对端)─────────────────────────────────
case SCTP_SEND_FAILED: {
struct sctp_send_failed* ssf = &snp->sn_send_failed;
// ssf_data 中含有未能发出的原始数据
printf("SCTP_SEND_FAILED: assoc=0x%x error=%d\n",
(uint32_t)ssf->ssf_assoc_id, ssf->ssf_error);
break;
}
// ── 适配层指示(握手阶段协商的应用层标识)────────────────────
case SCTP_ADAPTION_INDICATION: {
struct sctp_adaption_event* ae = &snp->sn_adaption_event;
printf("SCTP_ADAPTION_INDICATION: 0x%x\n",
(unsigned)ae->sai_adaption_ind);
break;
}
// ── 部分交付中止 ──────────────────────────────────────────────
case SCTP_PARTIAL_DELIVERY_EVENT: {
struct sctp_pdapi_event* pdapi = &snp->sn_pdapi_event;
if (pdapi->pdapi_indication == SCTP_PARTIAL_DELIVERY_ABORTED)
printf("SCTP_PARTIAL_DELIVERY_ABORTED\n");
else
printf("Unknown SCTP_PARTIAL_DELIVERY_EVENT 0x%x\n",
pdapi->pdapi_indication);
break;
}
// ── 对端发起优雅关闭 ──────────────────────────────────────────
case SCTP_SHUTDOWN_EVENT: {
struct sctp_shutdown_event* sse = &snp->sn_shutdown_event;
printf("SCTP_SHUTDOWN_EVENT: assoc=0x%x\n",
(uint32_t)sse->sse_assoc_id);
break;
}
default:
printf("Unknown notification event type=0x%x\n",
snp->sn_header.sn_type);
}
}
int main() {
printf("print_notification 函数定义完成,供服务器在收到通知时调用。\n");
return 0;
}
23.5 无序数据(Unordered Data)
SCTP默认在每个流(stream)内保证顺序交付。
若应用不需要顺序保证,可以使用 MSG_UNORDERED 标志,消息到达即可交付,无需等待前面丢失的消息。
// 发送无序数据(不分配流序号,对端收到即可交付)
sctp_sendmsg(sock_fd,
sendline, out_sz, // 数据和长度
to, tolen, // 目标地址
0, // ppid(协议ID,应用自定义)
MSG_UNORDERED, // 关键标志:无序发送
sri.sinfo_stream, // 使用哪个流(流号仍然有意义,用于分类)
0, 0); // ttl 和 context
有序 vs 无序对比:
有序(默认):
流0: [seq=1] [seq=2(丢)] [seq=3] ...
↓ ↓ ↓
交付 等待2 阻塞,等seq=2到达后才能继续
无序(MSG_UNORDERED):
流0: [no-seq] [no-seq] [no-seq] ...
↓ ↓ ↓
立即交付 立即交付 立即交付,互不影响
23.6 绑定地址子集(sctp_bindx)
TCP/UDP 的 bind() 只能绑定单个地址或通配地址(INADDR_ANY)。
SCTP 引入了 sctp_bindx(),允许绑定多个指定地址(必须同一端口)。
#include <netinet/sctp.h>
#include <netdb.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
/**
* sctp_bind_arg_list - 将命令行参数中的地址列表绑定到套接字
* @sock_fd: 已创建的SCTP套接字
* @argv : 地址字符串数组(如 "192.168.1.1" "10.0.0.1")
* @argc : 地址数量
*/
int sctp_bind_arg_list(int sock_fd, char** argv, int argc)
{
struct addrinfo* addr;
// 为每个地址分配 sockaddr_storage 大小的空间
// 使用最大尺寸结构(可容纳IPv4和IPv6),虽然浪费但简单安全
char* bindbuf = (char*)calloc(argc, sizeof(struct sockaddr_storage));
char* p = bindbuf;
char portbuf[10];
int addrcnt = 0;
// 端口号转字符串,供 getaddrinfo 使用
snprintf(portbuf, sizeof(portbuf), "%d", 9877); // SERV_PORT
for (int i = 0; i < argc; i++) {
struct addrinfo hints;
memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_UNSPEC; // 同时支持IPv4和IPv6
hints.ai_socktype = SOCK_SEQPACKET; // SCTP
// 将字符串地址解析为 sockaddr 结构
if (getaddrinfo(argv[i], portbuf, &hints, &addr) != 0) {
free(bindbuf);
return -1;
}
// 复制第一个解析结果(字面地址字符串只会有一个)
memcpy(p, addr->ai_addr, addr->ai_addrlen);
freeaddrinfo(addr);
addrcnt++;
p += addr->ai_addrlen; // 移动指针,构建紧密地址列表
}
// 将地址列表绑定到套接字
// SCTP_BINDX_ADD_ADDR 表示添加这些地址(相对的还有 SCTP_BINDX_REM_ADDR)
sctp_bindx(sock_fd, (struct sockaddr*)bindbuf, addrcnt,
SCTP_BINDX_ADD_ADDR);
free(bindbuf);
return 0;
}
int main(int argc, char* argv[]) {
if (argc < 2) {
fprintf(stderr, "用法: %s [地址列表...]\n", argv[0]);
return 1;
}
// 创建 IPv6 套接字(可同时接受IPv4映射地址,即双栈)
int sock_fd = socket(AF_INET6, SOCK_SEQPACKET, IPPROTO_SCTP);
if (sock_fd < 0) { perror("socket"); return 1; }
if (sctp_bind_arg_list(sock_fd, argv + 1, argc - 1) != 0) {
fprintf(stderr, "绑定地址集失败\n");
return 1;
}
printf("地址绑定成功\n");
return 0;
}
23.7 获取对端和本地地址信息
由于SCTP支持多宿主(multihoming),一个关联可能涉及多个本地地址和多个对端地址。
相关API
| 函数 | 作用 |
|---|---|
sctp_getpaddrs(fd, assoc_id, &sar) |
获取对端所有地址,返回地址数量 |
sctp_freepaddrs(sar) |
释放 sctp_getpaddrs 分配的内存 |
sctp_getladdrs(fd, assoc_id, &sal) |
获取本地所有地址,返回地址数量 |
sctp_freeladdrs(sal) |
释放 sctp_getladdrs 分配的内存 |
地址列表的内存布局(紧密排列)
返回的地址列表不是简单数组,而是紧密排列的 sockaddr 结构:
内存:
┌──────────────────┬──────────────────┬──────────────────┐
│ sockaddr_in(16B) │ sockaddr_in(16B) │ sockaddr_in6(28B)│
└──────────────────┴──────────────────┴──────────────────┘
第1个地址(IPv4) 第2个地址(IPv4) 第3个地址(IPv6)
每次需要根据 ss_family 判断当前地址结构的大小,然后指针才能正确移动。
打印地址列表函数
#include <stdio.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
/**
* sctp_print_addresses - 打印紧密排列的地址列表
* @addrs: sctp_getpaddrs/sctp_getladdrs 返回的地址列表
* @num : 地址数量
*/
void sctp_print_addresses(struct sockaddr_storage* addrs, int num)
{
struct sockaddr_storage* ss = addrs;
int salen;
char buf[INET6_ADDRSTRLEN];
for (int i = 0; i < num; i++) {
// 根据地址族打印地址
if (ss->ss_family == AF_INET) {
struct sockaddr_in* sin = (struct sockaddr_in*)ss;
inet_ntop(AF_INET, &sin->sin_addr, buf, sizeof(buf));
printf("%s:%d\n", buf, ntohs(sin->sin_port));
salen = sizeof(struct sockaddr_in); // IPv4结构大小:16字节
} else if (ss->ss_family == AF_INET6) {
struct sockaddr_in6* sin6 = (struct sockaddr_in6*)ss;
inet_ntop(AF_INET6, &sin6->sin6_addr, buf, sizeof(buf));
printf("[%s]:%d\n", buf, ntohs(sin6->sin6_port));
salen = sizeof(struct sockaddr_in6); // IPv6结构大小:28字节
} else {
fprintf(stderr, "未知地址族: %d\n", ss->ss_family);
return;
}
// 移动指针到下一个地址(紧密排列,必须用字节偏移)
ss = (struct sockaddr_storage*)((char*)ss + salen);
}
}
int main() {
printf("sctp_print_addresses 定义完成。\n");
return 0;
}
23.8 通过IP地址查找关联ID
#include <netinet/sctp.h>
#include <string.h>
/**
* sctp_address_to_associd - 将一个对端地址转换为关联ID
*
* 当应用层没有记录关联ID,但知道对端地址时,
* 可以通过 SCTP_PEER_ADDR_PARAMS 选项查询关联ID。
*
* @sock_fd: 套接字描述符
* @sa : 对端地址
* @salen : 地址长度
* 返回值: 关联ID(0表示查询失败或无效)
*/
sctp_assoc_t sctp_address_to_associd(int sock_fd,
struct sockaddr* sa,
socklen_t salen)
{
struct sctp_paddrparams sp;
int siz = sizeof(struct sctp_paddrparams);
// 清零结构体,确保未初始化字段不影响结果
memset(&sp, 0, siz);
// 将目标地址填入查询参数
memcpy(&sp.spp_address, sa, salen);
// 使用 sctp_opt_info 代替 getsockopt,
// 因为 SCTP_PEER_ADDR_PARAMS 需要同时传入和传出数据
// (既要告诉内核查哪个地址,又要从内核拿回关联ID)
sctp_opt_info(sock_fd, 0, SCTP_PEER_ADDR_PARAMS, &sp, &siz);
// 返回关联ID(若失败,memset(0)保证返回0,表示无效)
return sp.spp_assoc_id;
}
23.9 心跳机制与地址故障检测
SCTP默认开启心跳(heartbeat),类似TCP的keep-alive,但功能更强。
心跳相关参数
| 参数 | 含义 |
|---|---|
spp_hbinterval = 0 即 SCTP_NO_HB |
禁用心跳 |
spp_hbinterval = 0xffffffff 即 SCTP_ISSUE_HB |
立即发送一次心跳(按需) |
spp_hbinterval = N(毫秒) |
设置心跳间隔为N毫秒 |
spp_pathmaxrxt |
某地址连续失败N次后,标记该地址为不可达 |
心跳间隔计算公式:
Theartbeat=Ninterval+RTOcurrent+jitterrandomT_{heartbeat} = N_{interval} + RTO_{current} + jitter_{random}Theartbeat=Ninterval+RTOcurrent+jitterrandom
其中 NintervalN_{interval}Ninterval 是设置的间隔值,RTOcurrentRTO_{current}RTOcurrent 是当前重传超时值,jitterrandomjitter_{random}jitterrandom 是随机抖动。
#include <netinet/sctp.h>
#include <string.h>
/**
* heartbeat_action - 控制心跳行为
* @sock_fd: 套接字描述符
* @sa : 目标对端地址(针对特定地址设置)
* @salen : 地址长度
* @value : SCTP_NO_HB(禁用)/ SCTP_ISSUE_HB(立即发)/ 毫秒数
*/
int heartbeat_action(int sock_fd, struct sockaddr* sa,
socklen_t salen, unsigned int value)
{
struct sctp_paddrparams sp;
memset(&sp, 0, sizeof(sp));
// 设置心跳间隔值
sp.spp_hbinterval = value;
// 设置目标地址
memcpy(&sp.spp_address, sa, salen);
// spp_pathmaxrxt 保持0,表示不修改当前重传限制
// 应用设置
if (setsockopt(sock_fd, IPPROTO_SCTP,
SCTP_PEER_ADDR_PARAMS, &sp, sizeof(sp)) < 0) {
return -1;
}
return 0;
}
23.10 剥离关联(sctp_peeloff)
一对多风格的优点:单文件描述符管理所有关联,适合迭代服务器。
缺点:难以构建并发服务器(fork子进程后,所有关联仍共享同一个fd)。sctp_peeloff 解决了这个问题:将某个关联从一对多套接字中剥离,生成独立的一对一套接字,可以传给子进程。
#include <netinet/sctp.h>
#include <unistd.h>
#include <stdio.h>
// 假设 str_echo 已在其他文件定义(与TCP版本相同)
void str_echo(int fd);
int server_loop(int sock_fd) {
char readbuf[4096];
struct sockaddr_in cliaddr;
socklen_t len;
struct sctp_sndrcvinfo sri;
int msg_flags;
sctp_assoc_t assoc;
int connfd;
pid_t childpid;
int rd_sz;
for (;;) {
len = sizeof(cliaddr);
// 接收第一条消息
rd_sz = sctp_recvmsg(sock_fd, readbuf, sizeof(readbuf),
(struct sockaddr*)&cliaddr, &len,
&sri, &msg_flags);
// 原样回显给客户端
sctp_sendmsg(sock_fd, readbuf, rd_sz,
(struct sockaddr*)&cliaddr, len,
sri.sinfo_ppid,
sri.sinfo_flags,
sri.sinfo_stream, 0, 0);
// 通过对端地址查找关联ID
assoc = sctp_address_to_associd(sock_fd,
(struct sockaddr*)&cliaddr, len);
if ((int)assoc == 0) {
fprintf(stderr, "无法获取关联ID\n");
continue;
}
// 将该关联剥离为独立的一对一套接字
// 原 sock_fd 上其他关联不受影响
connfd = sctp_peeloff(sock_fd, assoc);
if (connfd == -1) {
perror("sctp_peeloff");
continue;
}
if ((childpid = fork()) == 0) {
// 子进程:不需要监听套接字
close(sock_fd);
// 用与TCP相同的 str_echo 处理后续交互
str_echo(connfd);
_exit(0);
} else {
// 父进程:不需要已剥离的套接字
close(connfd);
}
}
return 0;
}
23.11 控制超时参数
SCTP有7个关键的超时控制参数,影响故障检测速度:
| 字段 | 默认值 | 单位 | 含义 |
|---|---|---|---|
srto_min |
1000 | 毫秒 | 最小重传超时 |
srto_max |
60000 | 毫秒 | 最大重传超时 |
srto_initial |
3000 | 毫秒 | 初始重传超时 |
sinit_max_init_timeo |
3000 | 毫秒 | INIT阶段最大超时 |
sinit_max_attempts |
8 | 次 | INIT最大重试次数 |
spp_pathmaxrxt |
5 | 次 | 每个地址最大重传次数 |
sasoc_asocmaxrxt |
10 | 次 | 整个关联最大重传次数 |
场景1:单宿主对端断网(连接建立阶段)
INIT重传时序(指数退避,上限 Tmax=60000T_{max} = 60000Tmax=60000 ms):
Ttotal=∑i=0N−1min(Tinitial⋅2i, Tmax_init_timeo)T_{total} = \sum_{i=0}^{N-1} \min(T_{initial} \cdot 2^i,\ T_{max\_init\_timeo})Ttotal=i=0∑N−1min(Tinitial⋅2i, Tmax_init_timeo)
默认参数(8次重试,初始3秒,上限60秒):
Ttotal=3+6+12+24+48+60+60+60=273 秒T_{total} = 3+6+12+24+48+60+60+60 = 273 \text{ 秒}Ttotal=3+6+12+24+48+60+60+60=273 秒
调优方案1:减少重试次数 N=4N = 4N=4:
Ttotal=3+6+12+24=45 秒T_{total} = 3+6+12+24 = 45 \text{ 秒}Ttotal=3+6+12+24=45 秒
调优方案2:降低最大超时 Tmax_init_timeo=20T_{max\_init\_timeo} = 20Tmax_init_timeo=20 秒:
Ttotal=3+6+12+20+20+20+20+20=121 秒T_{total} = 3+6+12+20+20+20+20+20 = 121 \text{ 秒}Ttotal=3+6+12+20+20+20+20+20=121 秒
场景2:多宿主对端断电(数据传输阶段)
假设两个端点:本地有 IPA,IPBIP_A, IP_BIPA,IPB,对端有 IPX,IPYIP_X, IP_YIPX,IPY。
超时以两个地址交替触发,上限 Trto_max=60T_{rto\_max} = 60Trto_max=60 秒,最多 Nassoc=10N_{assoc} = 10Nassoc=10 次重传:
Ttotal=∑k=152k−1⋅(IPA+IPB)=1+1+2+2+4+4+8+8+16+16=62 秒T_{total} = \sum_{k=1}^{5} 2^{k-1} \cdot (IP_A + IP_B) = 1+1+2+2+4+4+8+8+16+16 = 62 \text{ 秒}Ttotal=k=1∑52k−1⋅(IPA+IPB)=1+1+2+2+4+4+8+8+16+16=62 秒
注意:由于在达到 Trto_maxT_{rto\_max}Trto_max 之前已超过 NassocN_{assoc}Nassoc 次,所以 srto_max 的默认值不起作用。
调优方案:减少 NassocN_{assoc}Nassoc 为 8 次:
Ttotal=1+1+2+2+4+4+8+8=30 秒T_{total} = 1+1+2+2+4+4+8+8 = 30 \text{ 秒}Ttotal=1+1+2+2+4+4+8+8=30 秒
23.12 何时选择SCTP而非TCP
SCTP的8大优势
1. 原生支持多宿主(Multihoming)
→ 多个网络接口自动提供冗余,应用无需额外处理
2. 消除队头阻塞(Head-of-line blocking)
→ 多个流并行传输,一条流丢包不影响其他流
3. 保留消息边界
→ 应用层发一条消息,收到的也是一条完整消息,无需自己分帧
4. 无序消息服务
→ 不需要顺序的数据可以立即交付,无需等待前面丢失的包
5. 部分可靠服务(某些实现)
→ 通过 sinfo_timetolive 为每条消息设置生命周期,超时可丢弃
6. 从TCP迁移简单
→ 一对一风格接口与TCP几乎相同,改动极少
7. 包含TCP的主要特性
→ 正向确认、重传、重排序、流量控制、慢启动、拥塞避免、SACK
8. 丰富的可调参数
→ 每个关联独立配置,灵活匹配应用需求
SCTP不适合的场景
- 纯字节流应用(telnet, rlogin, ssh):TCP能更高效地将字节打包进IP报文,SCTP保留消息边界反而带来额外开销
- 需要半关闭(half-close)的应用:SCTP不支持半关闭状态
- 需要紧急数据(urgent data)的应用:SCTP无此特性(可用单独的流模拟,但语义不完全等同)
第24章:TCP带外数据(Out-of-Band Data)
24.1 概述
带外数据(OOB)的核心思想:
当一端发生紧急情况,需要绕过正常的数据队列和流量控制,立即通知对端。
TCP并没有真正的带外数据,它提供的是紧急模式(urgent mode),
套接字API将其包装成了带外数据的概念。
24.2 TCP带外数据工作原理
发送侧:紧急指针的产生
假设发送方缓冲区中已有N字节普通数据待发送:
发送前(缓冲区):
┌─────────────────────────┐
│ 1 2 3 ... N │ 等待发送的普通数据
└─────────────────────────┘
发送 send(fd, "a", 1, MSG_OOB) 后:
┌─────────────────────────────┐
│ 1 2 3 ... N [OOB:a] │
└─────────────────────────────┘
↑
TCP紧急指针
指向OOB字节之后一位
TCP头部:URG标志 = 1,urgent_offset = (OOB字节位置 + 1) - seq
重要细节:
- URG标志和紧急偏移立即发送给对端(不受流量控制限制)
- OOB数据本身未必立即发出(仍然受流量控制约束,缓冲区满则等待)
- 多字节OOB:
send(fd, "abc", 3, MSG_OOB)中,只有最后一个字节c是OOB字节
接收侧:处理流程
错误情形汇总
| 错误 | 原因 |
|---|---|
EINVAL |
调用了 MSG_OOB 但对端未发送OOB数据 |
EWOULDBLOCK |
OOB通知已到,但OOB字节本身还未到达 |
EINVAL |
试图第二次读取同一个OOB字节(已被清除) |
EINVAL |
设置了 SO_OOBINLINE 却仍然使用 MSG_OOB 读取 |
发送示例(含注释)
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <stdio.h>
#include <string.h>
// 连接到服务器(省略错误处理简化示例)
int tcp_connect(const char* host, const char* port);
int main(int argc, char* argv[]) {
if (argc != 3) {
fprintf(stderr, "用法: %s <host> <port>\n", argv[0]);
return 1;
}
int sockfd = tcp_connect(argv[1], argv[2]);
// 发送3字节普通数据
write(sockfd, "123", 3);
printf("发送了3字节普通数据\n");
sleep(1); // 暂停确保每次发送作为独立TCP报文段
// 发送1字节OOB数据 '4'
// TCP会立即发送URG通知,但 '4' 本身可能稍后才到对端
send(sockfd, "4", 1, MSG_OOB);
printf("发送了1字节OOB数据\n");
sleep(1);
// 发送2字节普通数据
write(sockfd, "56", 2);
printf("发送了2字节普通数据\n");
sleep(1);
// 再发1字节OOB数据 '7'
send(sockfd, "7", 1, MSG_OOB);
printf("发送了1字节OOB数据\n");
sleep(1);
write(sockfd, "89", 2);
printf("发送了2字节普通数据\n");
close(sockfd);
return 0;
}
使用SIGURG接收OOB示例(含注释)
#include <sys/socket.h>
#include <netinet/in.h>
#include <fcntl.h>
#include <signal.h>
#include <unistd.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
// 全局变量:信号处理函数中需要访问
int listenfd, connfd;
// SIGURG 信号处理函数
void sig_urg(int signo) {
char buff[100];
printf("收到 SIGURG 信号\n");
// 使用 MSG_OOB 从独立的1字节OOB缓冲区读取
// 注意:从信号处理函数调用printf/recv并不严格安全,
// 生产代码应设置标志位,在主循环中处理
int n = recv(connfd, buff, sizeof(buff) - 1, MSG_OOB);
buff[n] = '\0';
printf("读到 %d 字节OOB数据: %s\n", n, buff);
}
int main(int argc, char* argv[]) {
char buff[100];
int n;
if (argc < 2) {
fprintf(stderr, "用法: %s <port>\n", argv[0]);
return 1;
}
// 创建监听套接字(省略细节)
struct sockaddr_in servaddr, cliaddr;
listenfd = socket(AF_INET, SOCK_STREAM, 0);
memset(&servaddr, 0, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
servaddr.sin_port = htons(atoi(argv[1]));
bind(listenfd, (struct sockaddr*)&servaddr, sizeof(servaddr));
listen(listenfd, 5);
socklen_t len = sizeof(cliaddr);
connfd = accept(listenfd, (struct sockaddr*)&cliaddr, &len);
// 必须在 accept 之后才设置信号处理和套接字属主
// 原因:connfd 在 accept 返回前不存在,信号处理函数无法使用它
signal(SIGURG, sig_urg);
// 设置套接字属主为当前进程,SIGURG才会发给我们
// F_SETOWN: 设置拥有者(也可以是进程组,用负值表示)
fcntl(connfd, F_SETOWN, getpid());
// 主循环:读取普通数据
for (;;) {
n = read(connfd, buff, sizeof(buff) - 1);
if (n == 0) {
printf("收到 EOF\n");
break;
}
buff[n] = '\0';
printf("读到 %d 字节普通数据: %s\n", n, buff);
}
return 0;
}
24.3 sockatmark 函数
sockatmark 用于判断当前读指针是否正好在OOB标记(out-of-band mark)处。
#include <sys/socket.h>
int sockatmark(int sockfd);
// 返回 1:当前位置在OOB标记处
// 返回 0:不在标记处
// 返回 -1:出错
内部实现(使用 ioctl):
#include <sys/ioctl.h>
#include <sys/socket.h>
int sockatmark(int fd) {
int flag;
// SIOCATMARK: 查询当前读位置是否在OOB标记处
// flag 非0 表示在标记处
if (ioctl(fd, SIOCATMARK, &flag) < 0)
return -1;
return (flag != 0);
}
OOB标记的两个关键特性
特性1:标记位于OOB字节"之前"
发送方发送顺序:
[1][2][3] → OOB[4] → [5]
接收方的标记位置:
┌───┬───┬───┬(mark)┬───┬───┐
│ 1 │ 2 │ 3 │ 4 │ 5 │ │
└───┴───┴───┴──────┴───┴───┘
↑
sockatmark 在读取'4'之前返回1
若设置了 SO_OOBINLINE:mark 在 '4' 之前,即读完 '3' 后 sockatmark=1
若未设置 SO_OOBINLINE:mark 在 '5' 之前(OOB字节被单独取走),读完 '3' 后 mark=1
特性2:read 总是在OOB标记处停下
场景:缓冲区有100字节,但距OOB标记只有5字节
read(fd, buf, 100) 实际只返回5字节(停在标记处)
目的:让应用有机会调用 sockatmark 检测标记
使用 SO_OOBINLINE + sockatmark 完整示例
#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
int main(int argc, char* argv[]) {
int listenfd, connfd, n, on = 1;
char buff[100];
if (argc < 2) {
fprintf(stderr, "用法: %s <port>\n", argv[0]);
return 1;
}
listenfd = socket(AF_INET, SOCK_STREAM, 0);
struct sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = htonl(INADDR_ANY);
addr.sin_port = htons(atoi(argv[1]));
// 必须在 listen/accept 之前设置 SO_OOBINLINE,
// 这样 accept 产生的连接套接字会继承该选项,
// 避免OOB字节在三次握手完成后、accept返回前就到达时被错误处理
setsockopt(listenfd, SOL_SOCKET, SO_OOBINLINE, &on, sizeof(on));
bind(listenfd, (struct sockaddr*)&addr, sizeof(addr));
listen(listenfd, 5);
socklen_t len = sizeof(addr);
connfd = accept(listenfd, (struct sockaddr*)&addr, &len);
// sleep 5秒,让发送方的所有数据都到达接收方缓冲区
// 这样可以演示"read在OOB标记处自动停止"的特性
sleep(5);
for (;;) {
// 在每次 read 之前检查是否到达OOB标记
if (sockatmark(connfd))
printf("当前位置在 OOB 标记处\n");
n = read(connfd, buff, sizeof(buff) - 1);
if (n == 0) {
printf("收到 EOF\n");
break;
}
buff[n] = '\0';
printf("读到 %d 字节: %s\n", n, buff);
}
return 0;
}
运行结果分析(对应发送方依次发送 “123” + OOB’4’ + “5”):
读到 3 字节: 123 ← read 在OOB标记前停下,只返回3字节
当前位置在 OOB 标记处 ← sockatmark 返回1
读到 2 字节: 45 ← '4' 是OOB字节但因SO_OOBINLINE在普通流中,'5'是普通数据
收到 EOF
24.4 使用 select 处理OOB数据
select 用**异常条件集合(exception set)**来通知OOB数据的到来。
错误版本(无限触发问题)
问题:select 会持续返回异常条件,
直到进程读过了OOB标记(即读取OOB之后的普通数据),
而不仅仅是读取OOB数据本身。
错误流程:
select → 异常条件 → recv(MSG_OOB) 读OOB → 再次 select
→ 异常条件仍然存在 → 再次 recv(MSG_OOB) → EINVAL(已读过了)
正确版本:使用 justreadoob 标志
#include <sys/select.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
int main(int argc, char* argv[]) {
int listenfd, connfd, n;
// justreadoob:标记"刚刚读了OOB数据,暂时不要再select异常条件"
int justreadoob = 0;
char buff[100];
fd_set rset, xset;
if (argc < 2) { fprintf(stderr, "用法: %s <port>\n", argv[0]); return 1; }
listenfd = socket(AF_INET, SOCK_STREAM, 0);
struct sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = htonl(INADDR_ANY);
addr.sin_port = htons(atoi(argv[1]));
bind(listenfd, (struct sockaddr*)&addr, sizeof(addr));
listen(listenfd, 5);
socklen_t len = sizeof(addr);
connfd = accept(listenfd, (struct sockaddr*)&addr, &len);
FD_ZERO(&rset);
FD_ZERO(&xset);
for (;;) {
FD_SET(connfd, &rset);
// 关键:只有在"没有刚刚读OOB"时,才监听异常条件
// 这样避免了读完OOB后异常条件仍然触发导致EINVAL的问题
if (justreadoob == 0)
FD_SET(connfd, &xset);
select(connfd + 1, &rset, NULL, &xset, NULL);
// 处理OOB数据(异常条件触发)
if (FD_ISSET(connfd, &xset)) {
n = recv(connfd, buff, sizeof(buff) - 1, MSG_OOB);
buff[n] = '\0';
printf("读到 %d 字节OOB数据: %s\n", n, buff);
// 标记"刚刚读了OOB",并清除异常条件集合
justreadoob = 1;
FD_CLR(connfd, &xset);
}
// 处理普通数据(可读条件触发)
if (FD_ISSET(connfd, &rset)) {
n = read(connfd, buff, sizeof(buff) - 1);
if (n == 0) { printf("收到 EOF\n"); break; }
buff[n] = '\0';
printf("读到 %d 字节普通数据: %s\n", n, buff);
// 读了普通数据后,可以再次监听OOB(标记已过)
justreadoob = 0;
}
}
return 0;
}
24.5 流控情况下的OOB通知
这是OOB机制的一个重要特性:即使接收方缓冲区已满(窗口为0),URG通知仍然能发出去。
场景:
发送方缓冲区:32768字节
接收方缓冲区:4096字节
时序:
① 发送方发16384字节普通数据
② 接收方缓冲区满,窗口=0,TCP停止发送数据
③ 发送方发送1字节OOB数据 'a'
→ TCP立即发送包含URG标志的报文段(即使数据本身发不出去)
→ 接收方收到URG通知,触发SIGURG
④ 接收方在SIGURG处理中尝试 recv(MSG_OOB)
→ 返回 EWOULDBLOCK('a' 本身还卡在发送方)
⑤ 解决方法:接收方先读走一些普通数据,腾出缓冲区空间
→ 接收方TCP广告非零窗口
→ 发送方把OOB字节 'a' 和后续数据发出
→ 接收方再次 recv(MSG_OOB) 成功
24.6 单一OOB标记限制
TCP每个连接只有一个OOB标记。若发送方连续发送两个OOB字节,新的标记会覆盖旧的。
示例演示:
发送方依次发送(无延迟):
"123" → OOB'4' → "5" → OOB'6' → "7"
接收方(sleep 5秒后读取,所有数据已到达):
读到 5 字节: 12345 ← OOB'4' 的标记被 OOB'6' 覆盖,'4' 混入普通流
当前在 OOB 标记处 ← 此处是 '6' 的标记
读到 2 字节: 67
收到 EOF
内存示意图:
OOB'4' 的标记被覆盖前:
┌───┬───┬───┬(mark4)┬───┬───┬───┐
│ 1 │ 2 │ 3 │ 4 │ 5 │ 6 │ 7 │
└───┴───┴───┴───────┴───┴───┴───┘
OOB'6' 到达,覆盖标记:
┌───┬───┬───┬───┬───┬(mark6)┬───┐
│ 1 │ 2 │ 3 │ 4 │ 5 │ 6 │ 7 │
└───┴───┴───┴───┴───┴───────┴───┘
'4' 不再是OOB,已混入普通数据流
24.7 实际应用:rlogin 如何使用OOB
核心思路:
- 服务端可能已经缓冲了大量待发给客户端的输出(最多一个窗口大小)
- 收到OOB通知后,服务端发送一个特殊字节,告诉客户端"丢弃到这里为止的所有输出"
- 客户端收到SIGURG,读数据直到遇到OOB标记,将这段数据全部丢弃
24.8 OOB数据三要素总结
TCP的带外数据实际上传递了三类不同的信息:
| 要素 | TCP层面 | 套接字API层面 | 传递时机 |
|---|---|---|---|
| 紧急通知 | URG标志 | SIGURG信号 / select异常 | 立即(不受流控) |
| OOB标记 | urgent_offset字段 | sockatmark() 返回值 | 随URG通知一起 |
| OOB数据值 | 数据字节本身 | recv(MSG_OOB) 的内容 | 受流控,可能延迟 |
核心约束(每连接只有一份):
- 只有一个紧急指针(urgent pointer)
- 只有一个OOB标记(mark)
- 只有一个1字节的OOB缓冲区(未设置
SO_OOBINLINE时)
附:关键概念速查
SCTP关联(Association)≈ TCP连接,但支持多地址
SCTP流(Stream)≈ 关联内部的有序子通道,一个关联可有多个流
MSG_EOR:消息结束标志(End Of Record),SCTP特有
MSG_NOTIFICATION:通知消息标志,SCTP特有
MSG_UNORDERED:无序发送标志,SCTP特有
MSG_OOB:带外数据标志,TCP特有
SO_OOBINLINE:将OOB数据放入普通流,需要用 sockatmark 定位
SIGURG:OOB数据到达时内核发给进程的信号
F_SETOWN:设置套接字属主,决定 SIGURG 发给谁
第25章 & 第26章:信号驱动I/O 与 线程 详解
第25章:信号驱动I/O(Signal-Driven I/O)
25.1 概念辨析:三种"异步"的区别
很多人容易混淆这三个概念,先把它们理清楚:
阻塞I/O(默认):
进程调用 read() → 数据没来就睡觉 → 数据来了被唤醒 → 读完返回
████████████████░░░░░░░░░░░░░░░░████
[ 等待数据到来,进程阻塞中... ] [处理]
非阻塞I/O(第16章):
进程调用 read() → 数据没来立刻返回 EWOULDBLOCK → 进程自己轮询
░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ████
[不停地问内核"有数据吗?",浪费CPU] [处理]
信号驱动I/O(本章):
进程告诉内核"有数据了发SIGIO给我" → 进程去干别的 → SIGIO来了再读
进程: ────────────────── 干其他事情 ──────────────────────[读取]
内核: ░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░ 数据来了! → 发SIGIO
真正的异步I/O(POSIX aio_XXX):
进程告诉内核"帮我读,读完通知我" → 进程去干别的 → 内核读完通知
进程: ──────────────────── 干其他事情 ──────────────────────[处理结果]
内核: ░░░░░░░░░░░░░░░░░░░░░ [内核负责读数据,放到用户缓冲区] → 通知
关键区别:信号驱动I/O中,内核只是通知"可以读了",进程自己还要去读;
而真正的异步I/O,内核帮你把数据读好放到你的缓冲区,再通知你。
25.2 信号驱动I/O的三步设置
使用信号驱动I/O需要按顺序做三件事:
注意顺序:必须先设置信号处理函数,再设置属主。
在SVR4系统上,SIGIO等同于SIGPOLL,其默认动作是终止进程,
若先设置属主再设置处理函数,中间有信号到来就会把进程杀死。
UDP套接字触发SIGIO的条件(简单,推荐使用)
| 触发条件 | 处理方式 |
|---|---|
| 数据报到达 | 调用 recvfrom 读取 |
| 套接字发生异步错误 | 调用 recvfrom 获取错误 |
TCP套接字触发SIGIO的条件(复杂,几乎无法使用)
| 触发条件 |
|---|
| 监听套接字上有新连接完成 |
| 断开连接请求已发起 |
| 断开连接请求已完成 |
| 连接的半边已关闭 |
| 数据到达 |
| 数据已发出(输出缓冲区有空间) |
| 发生异步错误 |
TCP的问题:触发条件太多,无法区分是哪种事件,信号处理函数不知道该做什么。
实际中信号驱动I/O只推荐用于UDP套接字或监听TCP套接字(只有新连接一种事件)。
25.3 NTP服务器的设计思路
NTP(网络时间协议)是信号驱动I/O在UDP上的经典应用场景。
普通UDP服务器(左侧)vs NTP风格(右侧):
普通UDP服务器: NTP风格(信号驱动):
主循环 SIGIO处理函数
│ │
├─ recvfrom(阻塞等待) ├─ recvfrom(立即读取)
│ ├─ 记录精确到达时间戳
├─ 处理请求(耗时) └─ 放入队列
│
└─ sendto 主循环
│
└─ 从队列取出 → 处理 → 发送回复
为什么NTP要这样设计?
因为NTP需要记录数据报到达的精确时间戳,如果在主循环里处理其他逻辑后再读取,
时间戳就不准了。SIGIO处理函数一收到信号就立刻读取并记录时间,确保时间戳精度。
25.4 完整示例:使用SIGIO的UDP回显服务器
数据结构设计
环形队列(大小为QSIZE=8):
索引: 0 1 2 3 4 5 6 7
┌────┬────┬────┬────┬────┬────┬────┬────┐
│DG │DG │DG │ │ │ │ │ │
└────┴────┴────┴────┴────┴────┴────┴────┘
↑ ↑
iget iput
(主循环 (信号处理函数
取数据的位置) 写入数据的位置)
每个DG结构:
┌─────────────────────────────────────────┐
│ dg_data → 指向数据报内容的指针(堆内存)│
│ dg_len → 数据报长度 │
│ dg_sa → 指向客户端地址的指针(堆内存)│
│ dg_salen → 地址结构长度 │
└─────────────────────────────────────────┘
完整可运行代码
// 编译: g++ -o udpserv_sigio udpserv_sigio.cpp
// 运行: ./udpserv_sigio 9877
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <signal.h>
#include <fcntl.h>
#include <sys/ioctl.h>
#include <unistd.h>
#include <errno.h>
#include <cstdlib>
#include <cstring>
#include <cstdio>
// ── 常量定义 ──────────────────────────────────────────────────
#define QSIZE 8 // 环形队列大小(同时缓存最多8个数据报)
#define MAXDG 4096 // 单个数据报的最大字节数
// ── 数据报队列元素 ────────────────────────────────────────────
typedef struct {
void* dg_data; // 指向数据报内容(由malloc分配)
size_t dg_len; // 数据报实际长度
struct sockaddr* dg_sa; // 指向客户端地址结构(由malloc分配)
socklen_t dg_salen; // 客户端地址结构长度
} DG;
// ── 全局变量(信号处理函数和主循环共享)──────────────────────
static int sockfd; // 套接字描述符
static DG dg[QSIZE]; // 环形队列
static long cntread[QSIZE + 1]; // 诊断计数器:每次SIGIO读几个报文
static int iget; // 主循环下一个要处理的队列索引
static int iput; // 信号处理函数下一个写入的索引
static int nqueue; // 队列中等待处理的数据报数量
static socklen_t clilen; // 客户端地址结构的最大长度
// ── 信号处理函数声明 ─────────────────────────────────────────
static void sig_io(int signo);
static void sig_hup(int signo);
// ── SIGHUP处理:打印诊断信息 ─────────────────────────────────
static void sig_hup(int signo)
{
// 打印每次SIGIO信号读取到的数据报数量分布(直方图)
for (int i = 0; i <= QSIZE; i++)
printf("cntread[%d] = %ld\n", i, cntread[i]);
}
// ── SIGIO处理:从套接字读取所有待读数据报,放入队列 ─────────
static void sig_io(int signo)
{
ssize_t len;
int nread; // 本次SIGIO信号共读了几个数据报(用于统计)
DG* ptr;
for (nread = 0; ; ) {
// 队列已满,说明主循环处理太慢,这里直接退出(生产代码应更优雅处理)
if (nqueue >= QSIZE) {
fprintf(stderr, "receive queue overflow\n");
exit(1);
}
ptr = &dg[iput]; // 取得当前写入位置
ptr->dg_salen = clilen; // 重置地址长度(recvfrom会修改它)
// 非阻塞读取:因为套接字已设置为FIONBIO(非阻塞)
// 若无数据则返回 EWOULDBLOCK,表示当前没有数据了
len = recvfrom(sockfd,
ptr->dg_data, MAXDG,
0,
ptr->dg_sa, &ptr->dg_salen);
if (len < 0) {
if (errno == EWOULDBLOCK)
break; // 正常:没有更多数据了,退出循环
else {
perror("recvfrom error");
exit(1);
}
}
// 成功读到一个数据报
ptr->dg_len = len;
nread++;
nqueue++; // 队列中数据报数量加1
// 循环推进iput,超过QSIZE则回绕到0(环形队列)
if (++iput >= QSIZE)
iput = 0;
}
// 统计:记录本次信号读了几个数据报
cntread[nread]++;
}
// ── 主服务函数 ────────────────────────────────────────────────
void dg_echo(int sockfd_arg, struct sockaddr* pcliaddr, socklen_t clilen_arg)
{
const int on = 1;
// 信号掩码相关变量
sigset_t zeromask; // 空掩码(所有信号均不屏蔽)
sigset_t newmask; // 包含SIGIO的掩码(用于屏蔽SIGIO)
sigset_t oldmask; // 保存调用前的掩码(用于恢复)
sockfd = sockfd_arg;
clilen = clilen_arg;
// ── 初始化环形队列:为每个槽分配内存 ────────────────────
for (int i = 0; i < QSIZE; i++) {
dg[i].dg_data = malloc(MAXDG);
dg[i].dg_sa = (struct sockaddr*)malloc(clilen);
dg[i].dg_salen = clilen;
}
iget = iput = nqueue = 0;
// ── 设置信号处理函数 ──────────────────────────────────────
signal(SIGHUP, sig_hup); // SIGHUP:打印诊断信息(发 kill -HUP <pid> 触发)
signal(SIGIO, sig_io); // SIGIO:数据报到达时触发
// ── 设置套接字属主(SIGIO发给本进程)────────────────────
fcntl(sockfd, F_SETOWN, getpid());
// ── 开启信号驱动I/O(FIOASYNC)──────────────────────────
// POSIX标准方式是 fcntl(sockfd, F_SETFL, O_ASYNC),
// 但不是所有系统都支持,用 ioctl FIOASYNC 更通用
ioctl(sockfd, FIOASYNC, &on);
// ── 设置非阻塞(FIONBIO)────────────────────────────────
// 原因:POSIX信号不排队,一次SIGIO可能对应多个数据报,
// 必须循环读到 EWOULDBLOCK 才能确保不遗漏
ioctl(sockfd, FIONBIO, &on);
// ── 初始化信号集 ─────────────────────────────────────────
sigemptyset(&zeromask); // 空集:用于sigsuspend时临时解除所有屏蔽
sigemptyset(&oldmask);
sigemptyset(&newmask);
sigaddset(&newmask, SIGIO); // newmask 中只有 SIGIO 位为1
// ── 屏蔽SIGIO,进入主循环 ────────────────────────────────
// 屏蔽SIGIO并保存旧掩码到oldmask,防止检查nqueue和sigsuspend之间的竞态
sigprocmask(SIG_BLOCK, &newmask, &oldmask);
for (;;) {
// 当队列为空时,用sigsuspend等待SIGIO
// sigsuspend做三件事(原子操作):
// 1. 将当前信号掩码设为zeromask(解除对SIGIO的屏蔽)
// 2. 睡眠,等待信号
// 3. 信号处理函数返回后,恢复之前的掩码(即重新屏蔽SIGIO)
while (nqueue == 0)
sigsuspend(&zeromask);
// ── 走到这里说明nqueue > 0,SIGIO已被重新屏蔽 ────────
// 解除屏蔽,允许新SIGIO进来(在sendto期间信号处理函数可以执行)
sigprocmask(SIG_SETMASK, &oldmask, NULL);
// 回显数据报给客户端
sendto(sockfd,
dg[iget].dg_data, dg[iget].dg_len,
0,
dg[iget].dg_sa, dg[iget].dg_salen);
// 推进iget,超过QSIZE则回绕(环形队列)
if (++iget >= QSIZE)
iget = 0;
// ── 重新屏蔽SIGIO,修改共享变量nqueue ────────────────
// nqueue被主循环和信号处理函数共享,修改时必须屏蔽信号
sigprocmask(SIG_BLOCK, &newmask, &oldmask);
nqueue--;
}
}
int main(int argc, char* argv[])
{
if (argc != 2) {
fprintf(stderr, "用法: %s <端口号>\n", argv[0]);
return 1;
}
int listenfd = socket(AF_INET, SOCK_DGRAM, 0);
if (listenfd < 0) { perror("socket"); return 1; }
struct sockaddr_in servaddr;
memset(&servaddr, 0, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
servaddr.sin_port = htons(atoi(argv[1]));
if (bind(listenfd, (struct sockaddr*)&servaddr, sizeof(servaddr)) < 0) {
perror("bind"); return 1;
}
struct sockaddr_in cliaddr;
dg_echo(listenfd, (struct sockaddr*)&cliaddr, sizeof(cliaddr));
return 0;
}
为什么信号不排队会导致漏读?
场景演示:
时刻1: 数据报A到达 → SIGIO发出 → sig_io开始执行
时刻2: 执行期间,数据报B和C同时到达 → SIGIO触发2次
但SIGIO此时被屏蔽,只记录"有1次未处理的SIGIO"(不是2次!)
时刻3: sig_io第一次执行:读A,循环继续读B,EWOULDBLOCK退出(C还没到)
时刻4: 第二次sig_io执行(因为记录了1次未处理):读C,EWOULDBLOCK退出
→ 正确,C被读到了
但若没有循环读取(只读一次就返回):
时刻3: sig_io第一次:只读A,返回
时刻4: 第二次sig_io:只读B,返回
→ C永远留在接收缓冲区,直到下一个数据报D到来才被读出
→ 这就是为什么必须设置非阻塞 + 循环读到EWOULDBLOCK
诊断计数器含义
运行6个并发客户端后的输出示例:
cntread[0] = 0 ← 信号触发时没读到数据(SIGIO在sig_io执行中再次触发,
返回时已无数据)
cntread[1] = 15899 ← 最常见:一次SIGIO读了1个数据报
cntread[2] = 2099 ← 一次SIGIO读了2个数据报
cntread[3] = 515 ← 一次SIGIO读了3个数据报
cntread[4] = 57 ← 一次SIGIO读了4个数据报
验证总数:15899×1+2099×2+515×3+57×4=21870=6×364515899 \times 1 + 2099 \times 2 + 515 \times 3 + 57 \times 4 = 21870 = 6 \times 364515899×1+2099×2+515×3+57×4=21870=6×3645,正确。
竞态条件分析
为什么检查nqueue时必须屏蔽SIGIO?
错误代码(有竞态):
主循环: if (nqueue == 0) ← 检查到nqueue=0
↓(此时SIGIO到来,sig_io把nqueue设为1)
主循环: sigsuspend() ← 永远睡下去,因为错过了那次SIGIO
正确做法:
sigprocmask(BLOCK SIGIO) ← 屏蔽SIGIO
while (nqueue == 0) ← 此时SIGIO不会打断检查
sigsuspend(zeromask) ← 原子地:解除屏蔽+睡眠,SIGIO来了被唤醒
唤醒后sigsuspend自动恢复屏蔽SIGIO
25.5 本章总结
| 套接字类型 | 推荐使用信号驱动I/O? | 原因 |
|---|---|---|
| UDP | 推荐 | 事件简单(数据到/异步错误),处理明确 |
| 监听TCP | 可以 | 只有新连接一种事件 |
| 已连接TCP | 不推荐 | 触发条件太多,无法区分 |
第26章:线程(Threads)
26.1 为什么要用线程?
fork的问题
传统并发服务器(使用fork):
父进程 子进程
│ │
├─ accept() │
│ │
├─ fork() ─────────────→ ├─ 处理客户端
│ 复制整个地址空间 │
│ 复制所有描述符 │
│ 非常昂贵! │
│ │
├─ close(connfd) ├─ close(listenfd)
│ ├─ str_echo()
│ └─ exit()
fork的两大缺点:
- 开销大:即使有写时复制(copy-on-write)优化,fork仍然需要复制页表、文件描述符表等
- IPC困难:子进程向父进程返回数据需要管道、共享内存等额外机制
线程的优势
线程模型:
进程(共享地址空间)
┌─────────────────────────────────────────┐
│ 全局变量、堆、代码段、文件描述符 │
│ ┌──────────┐ ┌──────────┐ │
│ │ 线程A │ │ 线程B │ │
│ │ 自己的栈 │ │ 自己的栈 │ │
│ │ 自己的TID│ │ 自己的TID│ │
│ │ 自己errno│ │ 自己errno│ │
│ └──────────┘ └──────────┘ │
└─────────────────────────────────────────┘
线程创建速度比fork快 10~100 倍
线程间共享数据无需IPC,直接访问全局变量
线程共享 vs 私有
| 共享(所有线程) | 私有(每个线程自己的) |
|---|---|
| 进程指令(代码段) | 线程ID(TID) |
| 大部分数据 | 寄存器组(含程序计数器、栈指针) |
| 打开的文件描述符 | 栈(局部变量和返回地址) |
| 信号处理函数和信号处置 | errno |
| 当前工作目录 | 信号掩码 |
| 用户ID和组ID | 优先级 |
26.2 基本线程函数
pthread_create:创建线程
#include <pthread.h>
// 返回0表示成功,返回正数错误码表示失败(注意:不是返回-1!)
int pthread_create(pthread_t *tid, // [输出] 新线程的ID
const pthread_attr_t *attr, // 线程属性,NULL表示默认
void *(*func)(void *), // 线程执行的函数
void *arg); // 传给函数的参数
函数签名理解:
void *(*func)(void *)
↑ ↑ ↑
│ │ └─ 参数是 void*(可以是任意类型的指针)
│ └─────── func 是一个函数指针
└────────────── 返回值是 void*(可以是任意类型的指针)
pthread_join:等待线程结束
类比:pthread_join 对线程,就像 waitpid 对进程。
#include <pthread.h>
int pthread_join(pthread_t tid, // 要等待的线程ID(必须指定,不能"等待任意一个")
void **status); // [输出] 线程的返回值(可为NULL)
pthread_self:获取自身线程ID
#include <pthread.h>
pthread_t pthread_self(void); // 类比进程的 getpid()
pthread_detach:分离线程
joinable(可连接,默认):
线程终止后,保留TID和退出状态,直到其他线程调用pthread_join
类似僵尸进程(必须有人"收尸")
detached(已分离):
线程终止后,所有资源自动释放,无法被join
类似守护进程,适合"做完就完"的后台线程
#include <pthread.h>
int pthread_detach(pthread_t tid);
// 常见用法:线程自己分离自己
pthread_detach(pthread_self());
pthread_exit:线程退出
#include <pthread.h>
void pthread_exit(void *status); // status不能指向局部变量(线程结束后局部变量消失)
线程的三种终止方式:
1. 调用 pthread_exit()
2. 执行函数 return(返回值作为退出状态)
3. 进程调用 exit()(所有线程同时终止)
26.3 str_cli函数的线程版本
回顾:str_cli 的作用是把标准输入发给服务器,同时把服务器的回复打印到标准输出。
设计图
完整代码
// 编译: g++ -o strcli_thread strcli_thread.cpp -lpthread
#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>
#include <pthread.h>
#include <cstdio>
#include <cstring>
#include <cstdlib>
#define MAXLINE 4096
// ── 全局变量:两个线程都需要访问 ────────────────────────────
static int sockfd; // TCP套接字(连接到服务器)
static FILE* fp; // 标准输入文件指针
// ── copyto线程:从标准输入读,写到套接字 ─────────────────────
void* copyto(void* arg)
{
char sendline[MAXLINE];
// 循环读取标准输入,每次读一行发给服务器
while (fgets(sendline, MAXLINE, fp) != NULL) {
size_t len = strlen(sendline);
// writen:确保发送了len个字节(循环write直到全部发完)
write(sockfd, sendline, len);
}
// 标准输入到达EOF(用户按Ctrl+D),发送FIN通知服务器"我不再发数据了"
// SHUT_WR:关闭写方向,但仍可接收数据(半关闭)
shutdown(sockfd, SHUT_WR);
return NULL; // 线程正常退出
}
// ── str_cli:主函数调用此函数处理客户端逻辑 ──────────────────
void str_cli(FILE* fp_arg, int sockfd_arg)
{
char recvline[MAXLINE];
pthread_t tid;
// 将参数存入全局变量,copyto线程可以访问
sockfd = sockfd_arg;
fp = fp_arg;
// 创建copyto线程(负责发送方向)
// 主线程负责接收方向(从套接字读 → 打印到stdout)
pthread_create(&tid, NULL, copyto, NULL);
// 主线程:循环从套接字读取服务器的回复并打印
ssize_t n;
while ((n = read(sockfd, recvline, MAXLINE - 1)) > 0) {
recvline[n] = '\0';
fputs(recvline, stdout);
}
// 当read返回0(服务器关闭连接)或copyto发了FIN后服务器回了FIN
// str_cli函数返回,调用者(main)通常会exit(),结束所有线程
}
int main(int argc, char* argv[])
{
if (argc != 3) {
fprintf(stderr, "用法: %s <IP> <端口>\n", argv[0]);
return 1;
}
int sock = socket(AF_INET, SOCK_STREAM, 0);
struct sockaddr_in servaddr;
memset(&servaddr, 0, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(atoi(argv[2]));
inet_pton(AF_INET, argv[1], &servaddr.sin_addr);
connect(sock, (struct sockaddr*)&servaddr, sizeof(servaddr));
str_cli(stdin, sock);
close(sock);
return 0;
}
与fork版本的比较:
| 指标 | fork版本 | 线程版本 |
|---|---|---|
| 耗时(实测) | 约9秒 | 约8.5秒 |
| 代码复杂度 | 中等(需要处理管道/信号) | 简单(共享全局变量) |
| 推荐程度 | 可用 | 更推荐(比非阻塞I/O简单得多) |
26.4 TCP回显服务器的线程版本
基本版本(直接传int,有平台限制)
// 编译: g++ -o tcpserv_thread01 tcpserv_thread01.cpp -lpthread
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <pthread.h>
#include <unistd.h>
#include <cstdio>
#include <cstring>
#include <cstdlib>
#define MAXLINE 4096
// 每个线程执行的函数:回显客户端发来的数据
void* doit(void* arg)
{
// 把 void* 转回 int(连接描述符)
// 注意:这依赖于 sizeof(int) <= sizeof(void*),大多数Unix系统满足
int connfd = (int)(long)arg;
// 线程分离自己:结束后资源自动释放,无需主线程join
pthread_detach(pthread_self());
// 回显逻辑
char buf[MAXLINE];
ssize_t n;
while ((n = read(connfd, buf, MAXLINE)) > 0)
write(connfd, buf, n);
// 必须关闭connfd!
// 原因:线程和主线程共享所有描述符,
// 主线程不会调用close(connfd)(不像fork那样子进程exit时自动关闭)
close(connfd);
return NULL;
}
int main(int argc, char* argv[])
{
if (argc != 2) {
fprintf(stderr, "用法: %s <端口>\n", argv[0]);
return 1;
}
int listenfd = socket(AF_INET, SOCK_STREAM, 0);
int on = 1;
setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
struct sockaddr_in servaddr;
memset(&servaddr, 0, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
servaddr.sin_port = htons(atoi(argv[1]));
bind(listenfd, (struct sockaddr*)&servaddr, sizeof(servaddr));
listen(listenfd, 10);
struct sockaddr_in cliaddr;
socklen_t len;
pthread_t tid;
for (;;) {
len = sizeof(cliaddr);
int connfd = accept(listenfd, (struct sockaddr*)&cliaddr, &len);
// 把 connfd 直接强转为 void* 传给线程
// (而不是传 &connfd,否则多个线程会读到同一个会被覆盖的变量)
pthread_create(&tid, NULL, doit, (void*)(long)connfd);
// 主线程不关闭connfd!
// fork模型中父进程需要close(connfd)来减少引用计数
// 线程模型中创建线程不影响描述符引用计数,主线程关闭会直接断开连接
}
return 0;
}
为什么不能传 &connfd?(常见错误)
错误做法:Pthread_create(&tid, NULL, doit, &connfd);
时序问题:
主线程: accept() → connfd=5 → pthread_create(arg=&connfd) → accept() → connfd=6
↓
新线程: connfd = *(int*)arg = ???
若主线程已经把connfd改成6,这里读到的就是6,不是5!
两个线程都在操作描述符6,描述符5被泄漏了!
安全版本(每次malloc独立空间)
// 编译: g++ -o tcpserv_thread02 tcpserv_thread02.cpp -lpthread
#include <sys/socket.h>
#include <netinet/in.h>
#include <pthread.h>
#include <unistd.h>
#include <cstdio>
#include <cstring>
#include <cstdlib>
#define MAXLINE 4096
void* doit(void* arg)
{
// 从堆上取出connfd的值,然后立即释放堆内存
// 这样做是安全的:每次accept后都malloc了一个新的int,
// 所以每个线程拿到的是独立的内存地址,不会互相覆盖
int connfd = *((int*)arg);
free(arg); // 释放主线程malloc的内存(POSIX要求malloc/free是线程安全的)
pthread_detach(pthread_self());
char buf[MAXLINE];
ssize_t n;
while ((n = read(connfd, buf, MAXLINE)) > 0)
write(connfd, buf, n);
close(connfd);
return NULL;
}
int main(int argc, char* argv[])
{
if (argc != 2) {
fprintf(stderr, "用法: %s <端口>\n", argv[0]);
return 1;
}
int listenfd = socket(AF_INET, SOCK_STREAM, 0);
int on = 1;
setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
struct sockaddr_in servaddr;
memset(&servaddr, 0, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
servaddr.sin_port = htons(atoi(argv[1]));
bind(listenfd, (struct sockaddr*)&servaddr, sizeof(servaddr));
listen(listenfd, 10);
struct sockaddr_in cliaddr;
socklen_t len;
pthread_t tid;
for (;;) {
len = sizeof(cliaddr);
// 关键:每次accept前malloc一个新的int
// 这给每个线程提供了独立的内存地址存放connfd
int* iptr = (int*)malloc(sizeof(int));
*iptr = accept(listenfd, (struct sockaddr*)&cliaddr, &len);
// 传指针(指向独立的int),线程内部读取后立即free
pthread_create(&tid, NULL, doit, iptr);
}
return 0;
}
线程安全函数
POSIX要求线程不安全(不能在多线程中使用)的函数:
| 不安全函数 | 线程安全替代版 | 说明 |
|---|---|---|
asctime |
asctime_r |
用静态缓冲区存结果 |
ctime |
ctime_r |
用静态缓冲区存结果 |
gmtime |
gmtime_r |
用静态缓冲区存结果 |
localtime |
localtime_r |
用静态缓冲区存结果 |
strtok |
strtok_r |
用静态状态变量 |
gethostbyname |
无标准替代 | 用静态缓冲区存结果 |
inet_ntoa |
inet_ntop |
用静态缓冲区存结果 |
26.5 线程私有数据(Thread-Specific Data)
问题:静态变量在多线程中的危险
以 readline 函数为例,原版用静态变量缓存已读数据:
// 原版 readline(线程不安全)
static int rl_cnt = 0; // 剩余字节数
static char* rl_bufptr = NULL; // 当前读取位置
static char rl_buf[4096]; // 内部缓冲区
// 问题:这些静态变量被所有线程共享!
// 线程A在读,线程B也在读,两者会互相破坏对方的状态
解决方案:线程私有数据
核心思想: 让每个线程都有自己独立的一份 rl_cnt、rl_bufptr、rl_buf,
但对调用者来说,调用方式完全不变(还是调 readline(fd, buf, len))。
系统内部实现原理
进程级别的 Key 数组(最多128个):
┌─────────┬──────────────┐
│ Key[0] │ flag │ destr │ ← flag:是否在用 destr:析构函数指针
├─────────┼──────────────┤
│ Key[1] │ 1 │ free │ ← readline 使用 Key[1]
├─────────┼──────────────┤
│ ... │ ... │ ... │
└─────────┴──────────────┘
每个线程自己的 Pthread 结构(包含 pkey 数组):
线程0: 线程1:
pkey[0] = NULL pkey[0] = NULL
pkey[1] = → [rl_cnt=3, pkey[1] = → [rl_cnt=0,
rl_buf=..., rl_buf=...,
rl_bufptr=...] rl_bufptr=...]
pkey[2] = NULL pkey[2] = NULL
... ...
pkey[key] 就是线程的"私有数据指针",每个线程独立,互不影响
使用流程
完整线程安全的 readline 实现
// 编译: g++ -o readline_safe readline_safe.cpp -lpthread
#include <pthread.h>
#include <unistd.h>
#include <errno.h>
#include <cstdlib>
#include <cstring>
#include <cstdio>
#define MAXLINE 4096
// ── 线程私有数据的 key(进程全局,所有线程共用同一个key值)──
static pthread_key_t rl_key;
// ── pthread_once 控制变量:确保 readline_once 只被调用一次 ──
static pthread_once_t rl_once = PTHREAD_ONCE_INIT;
// ── 每个线程私有的状态结构 ──────────────────────────────────
// 原来是3个静态变量,现在打包成结构体,每线程一份
typedef struct {
int rl_cnt; // 内部缓冲区中剩余的字节数(初始化为0)
char* rl_bufptr; // 指向内部缓冲区中下一个字节的指针
char rl_buf[MAXLINE]; // 内部缓冲区(避免每次都调用read系统调用)
} Rline;
// ── 析构函数:线程退出时自动调用,释放私有数据 ─────────────
static void readline_destructor(void* ptr)
{
free(ptr); // ptr 就是该线程的 Rline* 指针
}
// ── 一次性初始化函数:创建 key ───────────────────────────────
static void readline_once(void)
{
// 创建 key,并注册析构函数
// 线程退出时若 pkey[rl_key] != NULL,系统会调用 readline_destructor
pthread_key_create(&rl_key, readline_destructor);
}
// ── 内部读函数:从内部缓冲区读一个字节 ─────────────────────
static ssize_t my_read(Rline* tsd, int fd, char* ptr)
{
if (tsd->rl_cnt <= 0) {
// 内部缓冲区空了,重新从fd读一批数据
again:
tsd->rl_cnt = read(fd, tsd->rl_buf, MAXLINE);
if (tsd->rl_cnt < 0) {
if (errno == EINTR) goto again;
return -1;
} else if (tsd->rl_cnt == 0) {
return 0; // EOF
}
tsd->rl_bufptr = tsd->rl_buf; // 重置读指针
}
tsd->rl_cnt--;
*ptr = *tsd->rl_bufptr++; // 取出一个字节
return 1;
}
// ── 线程安全的 readline ─────────────────────────────────────
ssize_t readline(int fd, void* vptr, size_t maxlen)
{
// 确保 readline_once(创建key)只被调用一次
// 无论多少个线程同时调用 readline,readline_once 只执行一次
pthread_once(&rl_once, readline_once);
// 获取本线程的私有 Rline 结构指针
Rline* tsd = (Rline*)pthread_getspecific(rl_key);
if (tsd == NULL) {
// 本线程第一次调用 readline,分配私有数据
// calloc 会把内存清零,所以 rl_cnt 初始化为0,rl_bufptr 为NULL
tsd = (Rline*)calloc(1, sizeof(Rline));
tsd->rl_bufptr = tsd->rl_buf; // 初始化指针
// 把指针存入本线程的 pkey[rl_key]
pthread_setspecific(rl_key, tsd);
}
// 以下逻辑与原版 readline 相同,但使用 tsd 中的变量而不是静态变量
size_t n;
ssize_t rc;
char c, *ptr = (char*)vptr;
for (n = 1; n < maxlen; n++) {
rc = my_read(tsd, fd, &c);
if (rc == 1) {
*ptr++ = c;
if (c == '\n') break;
} else if (rc == 0) {
*ptr = 0;
return n - 1; // EOF,返回已读字节数
} else {
return -1; // 错误
}
}
*ptr = 0;
return n;
}
// ── 测试 main ────────────────────────────────────────────────
void* thread_func(void* arg)
{
int fd = (int)(long)arg;
char line[MAXLINE];
ssize_t n;
while ((n = readline(fd, line, MAXLINE)) > 0) {
printf("线程 %lu 读到: %s", pthread_self(), line);
}
return NULL;
}
int main()
{
printf("线程安全的 readline 已定义,可在服务器中使用。\n");
printf("每个线程第一次调用时会分配独立的 Rline 缓冲区,\n");
printf("线程退出时 readline_destructor 自动释放内存。\n");
return 0;
}
26.6 互斥锁(Mutex)
共享变量的竞态条件
问题场景:两个线程同时递减全局变量 nconn
C语句: nconn--
编译后的机器指令(以3条为例):
指令1: LOAD reg, nconn // 从内存读到寄存器
指令2: DEC reg // 寄存器减1
指令3: STORE nconn, reg // 写回内存
错误时序:
时刻1: 线程A 执行 LOAD → reg_A = 3(nconn当前值为3)
时刻2: 调度切换到线程B
时刻3: 线程B 执行 LOAD → reg_B = 3
时刻4: 线程B 执行 DEC → reg_B = 2
时刻5: 线程B 执行 STORE → nconn = 2
时刻6: 调度切换回线程A
时刻7: 线程A 执行 DEC → reg_A = 2(注意!A的寄存器保存的还是3)
时刻8: 线程A 执行 STORE → nconn = 2 ← 错误!应该是1
结果:nconn 应该是1,实际是2
互斥锁的使用
// 编译: g++ -o mutex_demo mutex_demo.cpp -lpthread
#include <pthread.h>
#include <cstdio>
#include <cstdlib>
#define NLOOP 5000
int counter = 0;
// 互斥锁:静态分配时必须用此宏初始化
pthread_mutex_t counter_mutex = PTHREAD_MUTEX_INITIALIZER;
void* doit(void* vptr)
{
int val;
for (int i = 0; i < NLOOP; i++) {
// ── 加锁:如果锁被其他线程持有,当前线程阻塞等待 ─────
pthread_mutex_lock(&counter_mutex);
// ── 临界区:只有持有锁的线程才能执行这段代码 ──────────
val = counter;
printf("%lu: %d\n", pthread_self(), val + 1);
counter = val + 1;
// ── 解锁:释放锁,其他等待线程可以继续 ────────────────
pthread_mutex_unlock(&counter_mutex);
}
return NULL;
}
int main()
{
pthread_t tidA, tidB;
pthread_create(&tidA, NULL, doit, NULL);
pthread_create(&tidB, NULL, doit, NULL);
// 等待两个线程都完成
pthread_join(tidA, NULL);
pthread_join(tidB, NULL);
printf("最终 counter = %d(应该是 %d)\n", counter, NLOOP * 2);
return 0;
}
互斥锁的性能开销: 实测约 10%10\%10% 的CPU开销,远不是瓶颈,值得使用。
互斥锁保护的原则
规则:凡是被多个线程读写的变量,都必须用互斥锁保护
例外:只读变量不需要锁(因为只读不会产生数据竞争)
检查清单:
□ 全局变量被多线程访问?→ 加锁
□ 被 malloc 的共享结构?→ 加锁
□ 线程局部变量(栈上)?→ 不需要锁(每个线程有自己的栈)
□ 线程私有数据?→ 不需要锁(每个线程有独立副本)
26.7 条件变量(Condition Variable)
问题:互斥锁不够用
互斥锁只能防止同时访问,但不能让线程"等待某个条件成立"。
轮询方式(浪费CPU):
// 主线程不停地检查 ndone,浪费大量CPU
while (nlefttoread > 0) {
pthread_mutex_lock(&ndone_mutex);
if (ndone > 0) {
// 处理已完成的线程
}
pthread_mutex_unlock(&ndone_mutex);
// 没有sleep,主线程疯狂轮询,CPU占用率100%
}
正确方式:条件变量(让线程睡眠,等待"某个条件"被触发):
条件变量 = 互斥锁 + 等待/通知机制
互斥锁:保护共享变量(ndone)
条件变量:让线程在条件不满足时睡眠,条件满足时被唤醒
关键API
// 等待条件(原子地解锁+睡眠,被唤醒后重新加锁)
int pthread_cond_wait(pthread_cond_t *cptr, pthread_mutex_t *mptr);
// 唤醒一个等待线程
int pthread_cond_signal(pthread_cond_t *cptr);
// 唤醒所有等待线程
int pthread_cond_broadcast(pthread_cond_t *cptr);
// 带超时的等待(abstime是绝对时间,不是时间差)
int pthread_cond_timedwait(pthread_cond_t *cptr,
pthread_mutex_t *mptr,
const struct timespec *abstime);
为什么 pthread_cond_wait 必须原子地解锁+睡眠?
错误设计(非原子操作):
pthread_mutex_unlock(&m); ← 解锁后
← 此时其他线程修改ndone并signal,但我们还没开始等!信号丢失!
pthread_cond_wait(&cv, &m); ← 开始等,永远等不到了
正确设计(原子操作):
pthread_cond_wait内部:
1. 解锁mutex ┐
2. 把自己加入等待队列 ├── 原子执行,不会有信号在中间丢失
3. 睡眠 ┘
4. 收到signal后:重新加锁mutex,然后返回
带超时的等待(注意是绝对时间)
// 超时等待示例:等待最多5秒
struct timeval tv;
struct timespec ts;
gettimeofday(&tv, NULL);
// 当前时间 + 5秒 = 超时的绝对时刻
ts.tv_sec = tv.tv_sec + 5;
ts.tv_nsec = tv.tv_usec * 1000; // 微秒转纳秒($1\mu s = 10^3 ns$)
// 如果5秒内没有signal,返回 ETIMEDOUT
pthread_cond_timedwait(&cond, &mutex, &ts);
为什么用绝对时间?
若函数因信号打断而提前返回,可以直接再次调用,不需要重新计算时间差。
26.8 完整示例:Web客户端用条件变量等待线程完成
整体架构
完整代码
// 编译: g++ -o web_threads web_threads.cpp -lpthread
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <pthread.h>
#include <unistd.h>
#include <cstdio>
#include <cstring>
#include <cstdlib>
#define MAXFILES 20
#define MAXLINE 4096
// 每个文件的状态标志
#define F_CONNECTING 1 // 正在connect
#define F_READING 2 // connect完成,正在读
#define F_DONE 4 // 读完了
#define F_JOINED 8 // 主线程已经pthread_join
// 每个要下载的文件的信息
struct file {
const char* f_name; // 文件路径(如 "/index.html")
const char* f_host; // 服务器地址
int f_fd; // 连接套接字
int f_flags; // 状态标志
pthread_t f_tid; // 线程ID
};
// ── 全局共享状态 ──────────────────────────────────────────────
static struct file fileinfo[MAXFILES];
static int nfiles; // 总文件数
static int nconn; // 当前活跃连接数
static int nlefttoconn; // 还剩多少文件需要建立连接
static int nlefttoread; // 还剩多少文件没有读完
// ── 条件变量 + 互斥锁(保护ndone)───────────────────────────
static int ndone = 0;
static pthread_mutex_t ndone_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t ndone_cond = PTHREAD_COND_INITIALIZER;
// ── 每个线程执行的函数:建连+发GET+读响应 ────────────────────
void* do_get_read(void* vptr)
{
struct file* fptr = (struct file*)vptr;
// 建立TCP连接(阻塞模式,线程会在此等待,不影响其他线程)
int fd = socket(AF_INET, SOCK_STREAM, 0);
struct sockaddr_in servaddr;
memset(&servaddr, 0, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(80);
inet_pton(AF_INET, fptr->f_host, &servaddr.sin_addr);
connect(fd, (struct sockaddr*)&servaddr, sizeof(servaddr));
fptr->f_fd = fd;
printf("线程 %lu: 已连接到 %s,下载 %s\n",
pthread_self(), fptr->f_host, fptr->f_name);
// 发送HTTP GET请求
char request[MAXLINE];
snprintf(request, sizeof(request),
"GET %s HTTP/1.0\r\nHost: %s\r\n\r\n",
fptr->f_name, fptr->f_host);
write(fd, request, strlen(request));
fptr->f_flags = F_READING;
// 读取响应直到服务器关闭连接
char buf[MAXLINE];
ssize_t n;
long total = 0;
while ((n = read(fd, buf, MAXLINE)) > 0)
total += n;
printf("线程 %lu: %s 读取完毕,共 %ld 字节\n",
pthread_self(), fptr->f_name, total);
close(fd);
// ── 通知主线程:本线程完成了 ─────────────────────────────
pthread_mutex_lock(&ndone_mutex);
fptr->f_flags = F_DONE; // 标记完成
ndone++; // 完成计数加1
pthread_cond_signal(&ndone_cond); // 唤醒主线程
pthread_mutex_unlock(&ndone_mutex);
return fptr; // 返回值供主线程通过pthread_join获取
}
int main(int argc, char* argv[])
{
if (argc < 4) {
fprintf(stderr, "用法: %s <最大并发数> <IP> <文件1> [文件2...]\n", argv[0]);
return 1;
}
int maxnconn = atoi(argv[1]);
const char* host = argv[2];
nfiles = 0;
for (int i = 3; i < argc && nfiles < MAXFILES; i++, nfiles++) {
fileinfo[nfiles].f_name = argv[i];
fileinfo[nfiles].f_host = host;
fileinfo[nfiles].f_flags = 0;
}
nlefttoread = nlefttoconn = nfiles;
nconn = 0;
pthread_t tid;
while (nlefttoread > 0) {
// ── 如果可以,创建新线程 ──────────────────────────────
while (nconn < maxnconn && nlefttoconn > 0) {
// 找一个还没开始下载的文件
int i;
for (i = 0; i < nfiles; i++)
if (fileinfo[i].f_flags == 0) break;
fileinfo[i].f_flags = F_CONNECTING;
pthread_create(&tid, NULL, do_get_read, &fileinfo[i]);
fileinfo[i].f_tid = tid;
nconn++;
nlefttoconn--;
}
// ── 等待至少一个线程完成 ─────────────────────────────
pthread_mutex_lock(&ndone_mutex);
// 用while而不是if:防止虚假唤醒(spurious wakeup)
while (ndone == 0)
pthread_cond_wait(&ndone_cond, &ndone_mutex);
// pthread_cond_wait 做三件事(原子):
// 1. 解锁 ndone_mutex
// 2. 睡眠等待 ndone_cond 被signal
// 3. 被唤醒后重新加锁 ndone_mutex
// 找到所有已完成(F_DONE)的线程,逐个join
struct file* fptr;
for (int i = 0; i < nfiles; i++) {
if (fileinfo[i].f_flags & F_DONE) {
pthread_join(fileinfo[i].f_tid, (void**)&fptr);
// 验证:join返回的指针应该就是 &fileinfo[i]
fileinfo[i].f_flags = F_JOINED;
ndone--;
nconn--;
nlefttoread--;
printf("主线程: %s 已完成并join\n", fptr->f_name);
}
}
pthread_mutex_unlock(&ndone_mutex);
}
printf("所有文件下载完毕\n");
return 0;
}
条件变量使用模式总结
生产者(工作线程): 消费者(主线程):
lock(mutex) lock(mutex)
修改共享状态(ndone++) while (条件不满足)
signal(cond) cond_wait(cond, mutex)
unlock(mutex) 处理共享状态
unlock(mutex)
关键点:
1. 修改共享状态必须在 lock 内
2. signal 可以在 lock 内或 lock 外(通常在 lock 内)
3. wait 必须在 lock 内,且用 while 检查条件(防虚假唤醒)
4. wait 原子地解锁+睡眠,唤醒后原子地加锁+返回
26.9 本章总结
最佳实践建议:
- 优先使用线程而非非阻塞I/O(代码简单得多)
- 共享变量一定要用互斥锁保护,不要侥幸
- 等待条件一定要用
while检查(不用if),防止虚假唤醒 - 调用函数前先确认是否线程安全,不安全的用
_r后缀版本 - 每个线程用完的描述符自己关,不依赖主线程
附:关键API速查
信号驱动I/O相关:
signal(SIGIO, handler) 设置SIGIO处理函数
fcntl(fd, F_SETOWN, getpid()) 设置套接字属主
ioctl(fd, FIOASYNC, &on) 开启信号驱动I/O
ioctl(fd, FIONBIO, &on) 设置非阻塞(配合循环读)
sigsuspend(&zeromask) 原子地解除屏蔽+睡眠+唤醒后恢复屏蔽
线程基础:
pthread_create(&tid, NULL, func, arg) 创建线程
pthread_join(tid, &retval) 等待特定线程
pthread_detach(pthread_self()) 自我分离
pthread_exit(retval) 退出线程
同步原语:
pthread_mutex_lock(&m) 加锁(阻塞等待)
pthread_mutex_unlock(&m) 解锁
pthread_cond_wait(&cv, &m) 等待条件(原子解锁+睡眠)
pthread_cond_signal(&cv) 唤醒一个等待者
pthread_cond_broadcast(&cv) 唤醒所有等待者
线程私有数据:
pthread_once(&oc, init_func) 确保init_func只执行一次
pthread_key_create(&key, dtor) 创建key(注册析构函数)
pthread_setspecific(key, ptr) 设置本线程的私有指针
pthread_getspecific(key) 获取本线程的私有指针
IP 选项与原始套接字详解
本文对应《Unix网络编程》第27、28章,用尽可能通俗的语言讲解 IPv4/IPv6 的选项机制与原始套接字编程。
第27章 IP 选项(IP Options)
27.1 总体概念
IP 选项就像是快递单上的"备注栏"——普通情况下不需要,但有时需要告诉路由器"请走这条路"或"请每个路由器都看一眼我"。
- IPv4 的选项通过
IP_OPTIONS套接字选项来读写,格式直接对应数据包里的字节布局。 - IPv6 的选项通过一套函数接口来操作,程序员不需要关心底层字节格式。
27.2 IPv4 的 10 种选项
IPv4 头部固定 20 字节,后面最多可以跟 40 字节的选项区(因为头部长度字段是 4 位,最大值 15,即 15×4=6015 \times 4 = 6015×4=60 字节,减去固定头 60−20=4060 - 20 = 4060−20=40 字节)。
| 编号 | 名称 | 说明 |
|---|---|---|
| 1 | NOP | 填充对齐用,1字节 |
| 2 | EOL | 选项列表结束标志,1字节 |
| 3 | LSRR | 宽松源路由(路径灵活) |
| 4 | SSRR | 严格源路由(路径严格) |
| 5 | Timestamp | 时间戳 |
| 6 | Record Route | 记录路由 |
| 7 | 基本安全 | 已废弃 |
| 8 | 扩展安全 | 已废弃 |
| 9 | 流标识 | 已废弃 |
| 10 | Router Alert | 让所有路由器都检查此包 |
读写方式:
// 设置选项:把 options_buf 中的选项安装到套接字
setsockopt(sockfd, IPPROTO_IP, IP_OPTIONS, options_buf, buf_len);
// 清除选项:传 NULL 或长度为 0
setsockopt(sockfd, IPPROTO_IP, IP_OPTIONS, NULL, 0);
// 读取选项(对已连接 TCP 套接字,返回的是反转后的源路由)
int len = 44;
getsockopt(sockfd, IPPROTO_IP, IP_OPTIONS, buf, &len);
27.3 IPv4 源路由选项
什么是源路由?
正常情况下,路由器自己决定数据包走哪条路。源路由是发送方"钦点"路径,让包按指定节点顺序转发。
宽松源路由(LSRR):必须经过列出的节点,但中间可以绕路
严格源路由(SSRR):只能经过列出的节点,不能绕路
缓冲区格式(传给 setsockopt 的格式)
44 字节缓冲区布局:
+-----+------+-----+-----+----------+----------+-----+----------+----------+
| NOP | code | len | ptr | IP addr1 | IP addr2 | ... | IP addr9 | dest IP |
+-----+------+-----+-----+----------+----------+-----+----------+----------+
1B 1B 1B 1B 4B 4B 4B 4B
NOP(0x00):填充,让后续 IP 地址对齐到 4 字节边界code:0x83 = LSRR,0x89 = SSRRlen:选项总字节数(含3字节头和末尾目标地址),公式为 len=3+n×4len = 3 + n \times 4len=3+n×4,nnn 为地址个数ptr:指向下一个待处理地址的偏移,初始为 4(指向第一个地址)
工作原理
发送方把路径写进选项:
源 → A → B → C → 目标
数据包经过每个节点时,节点把自己的地址替换进去(记录路径),
同时 ptr 增加 4,指向下一跳。
到达目标后,目标反转列表,就知道怎么回去。
用 ASCII 图演示:
发送时的选项(ptr=4,指向A):
[NOP][code][len][ptr=4][ A ][ B ][ C ]
^
经过A路由器后(ptr=8,指向B):
[NOP][code][len][ptr=8][A的出口IP][ B ][ C ]
^
经过B路由器后(ptr=12,指向C):
[NOP][code][len][ptr=12][A出口][B出口][ C ]
^
源路由三个核心函数的 C++ 完整示例
/*
* 演示:构造一个 LSRR(宽松源路由)选项并发送
* 编译:g++ -o srcrt_demo srcrt_demo.cpp
*/
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
// 选项类型代码
#define IPOPT_NOP 0x01 // 空操作(填充用)
#define IPOPT_LSRR 0x83 // 宽松源路由
#define IPOPT_SSRR 0x89 // 严格源路由
// 静态变量:在构建选项时使用
static unsigned char *optr; // 当前写入位置
static unsigned char *lenptr; // 指向 len 字段的指针
static int ocnt; // 已添加的地址个数
/*
* inet_srcrt_init: 初始化源路由选项缓冲区
* 参数 type: 0 = LSRR(宽松),1 = SSRR(严格)
* 返回: 指向缓冲区起始位置的指针(供 setsockopt 使用)
*/
unsigned char *inet_srcrt_init(int type)
{
// 分配最大 44 字节(1 NOP + 3字节头 + 最多10个地址×4字节)
optr = (unsigned char *)malloc(44);
memset(optr, 0, 44); // 全部置0,相当于 EOL 填充
ocnt = 0;
// 写入 NOP(让后面的 IP 地址对齐到4字节边界)
*optr++ = IPOPT_NOP;
// 写入选项类型(LSRR 或 SSRR)
*optr++ = (type ? IPOPT_SSRR : IPOPT_LSRR);
// 保存 len 字段的位置,稍后填入
lenptr = optr++;
// ptr 字段初始值为 4(指向第一个 IP 地址)
*optr++ = 4;
// 返回 NOP 之前的位置(即缓冲区开头),供 setsockopt 使用
return (optr - 4);
}
/*
* inet_srcrt_add: 向源路由选项中追加一个 IP 地址
* 参数 addr: 点分十进制字符串,如 "192.168.1.1"
* 返回: 缓冲区当前总大小(供 setsockopt 的第5个参数使用)
*/
int inet_srcrt_add(const char *addr)
{
if (ocnt > 9) {
fprintf(stderr, "too many source route hops\n");
return -1;
}
// 将点分十进制转为二进制
struct in_addr in;
if (inet_pton(AF_INET, addr, &in) != 1) {
fprintf(stderr, "invalid address: %s\n", addr);
return -1;
}
// 将 IP 地址(4字节)写入缓冲区
memcpy(optr, &in, sizeof(struct in_addr));
optr += sizeof(struct in_addr);
ocnt++;
// 更新 len 字段:3字节头 + ocnt个地址×4字节
// 公式:len = 3 + ocnt * 4
int len = 3 + (ocnt * (int)sizeof(struct in_addr));
*lenptr = (unsigned char)len;
// 返回总缓冲区大小(含开头的 NOP,所以 +1)
return (len + 1);
}
/*
* 主程序演示:向目标主机发送一个带源路由的 UDP 数据包
*/
int main(int argc, char *argv[])
{
if (argc < 3) {
fprintf(stderr, "用法: %s <中间hop> <目标IP>\n", argv[0]);
fprintf(stderr, "示例: %s 192.168.1.1 10.0.0.1\n", argv[0]);
return 1;
}
// 创建 UDP 套接字
int sockfd = socket(AF_INET, SOCK_DGRAM, 0);
if (sockfd < 0) {
perror("socket");
return 1;
}
// 初始化 LSRR 源路由选项
unsigned char *ptr = inet_srcrt_init(0); // 0 = LSRR
// 添加中间跳节点(argv[1])
int len = inet_srcrt_add(argv[1]);
// 添加目标地址(argv[2])作为路由中最后一跳
len = inet_srcrt_add(argv[2]);
if (len < 0) {
fprintf(stderr, "添加地址失败\n");
close(sockfd);
return 1;
}
// 安装源路由选项到套接字
// 此后从这个套接字发出的所有 IP 数据包都会携带该选项
if (setsockopt(sockfd, IPPROTO_IP, IP_OPTIONS, ptr, len) < 0) {
perror("setsockopt IP_OPTIONS");
free(ptr);
close(sockfd);
return 1;
}
free(ptr);
// 设置目标地址(实际上第一跳是 argv[1],但 connect 写目标 argv[2])
struct sockaddr_in dest;
memset(&dest, 0, sizeof(dest));
dest.sin_family = AF_INET;
dest.sin_port = htons(9999);
inet_pton(AF_INET, argv[2], &dest.sin_addr);
// 发送测试数据
const char *msg = "Hello with source route!";
ssize_t n = sendto(sockfd, msg, strlen(msg), 0,
(struct sockaddr *)&dest, sizeof(dest));
if (n < 0) {
perror("sendto");
} else {
printf("发送了 %zd 字节,携带 LSRR 源路由选项\n", n);
printf("路由路径: 本机 -> %s -> %s\n", argv[1], argv[2]);
}
close(sockfd);
return 0;
}
安全隐患
源路由有严重的安全问题:
正常情况:
攻击者(192.168.1.100) ---X--- 被信任主机(10.0.0.1)
|
服务器
利用源路由欺骗:
攻击者 ----[伪造源IP=10.0.0.1, 源路由经过攻击者]----> 服务器
服务器回包 ----[按源路由]----> 攻击者
攻击者看到了 TCP 序列号,可以伪造后续包!
解决方案: 凡是收到带源路由的连接,直接拒绝(不能只清除选项,因为握手第二步已经按源路由发出去了)。
27.4 IPv6 扩展头
IPv6 头部固定 40 字节,不支持选项。但允许在固定头和传输层头之间插入"扩展头":
[IPv6固定头 40B][扩展头1][扩展头2]...[TCP/UDP头][数据]
六种扩展头:
| 扩展头 | 说明 |
|---|---|
| 逐跳选项 | 必须紧跟固定头,每个路由器都要处理 |
| 目的地选项 | 只有最终目标处理 |
| 路由头 | 类似IPv4源路由 |
| 分片头 | 内核自动处理 |
| 认证头(AH) | IPSec,内核处理 |
| 封装安全载荷(ESP) | IPSec,内核处理 |
27.5 IPv6 逐跳选项与目的地选项
TLV 编码格式
每个单独的选项使用 类型-长度-值(TLV) 格式:
+------+--------+--------------------+
| type | length| value |
+------+--------+--------------------+
1字节 1字节 length 字节
type 的高两位决定"不认识时怎么办":
高位={00跳过,继续处理01丢弃包10丢弃并发ICMP错误(含组播目标)11丢弃并发ICMP错误(非组播目标) \text{高位} = \begin{cases} 00 & \text{跳过,继续处理} \\ 01 & \text{丢弃包} \\ 10 & \text{丢弃并发ICMP错误(含组播目标)} \\ 11 & \text{丢弃并发ICMP错误(非组播目标)} \end{cases} 高位=⎩
⎨
⎧00011011跳过,继续处理丢弃包丢弃并发ICMP错误(含组播目标)丢弃并发ICMP错误(非组播目标)
构建选项的四个函数
inet6_opt_init → 初始化扩展头缓冲区,返回空头所需字节数
inet6_opt_append → 追加一个选项,返回更新后的总长度
inet6_opt_finish → 完成扩展头(补齐到8字节倍数),返回最终长度
inet6_opt_set_val → 把选项值写入 append 返回的数据缓冲区
两遍扫描模式(推荐用法):
第一遍(计算大小,extbuf=NULL):
inet6_opt_init(NULL, 0)
inet6_opt_append(NULL, 0, ...) ← 每个选项调一次
inet6_opt_finish(NULL, 0, ...) ← 返回总大小
malloc(总大小)
第二遍(真正填充):
inet6_opt_init(buf, size)
inet6_opt_append(buf, size, ...)
inet6_opt_set_val(...) ← 填入选项值
inet6_opt_finish(buf, size, ...)
27.6 IPv6 路由头(源路由)
IPv6 的路由头格式:
+----------+------------------+----------+--------------+
|next_hdr | hdr_ext_len |routing | segments_left|
+----------+------------------+----------+--------------+
| reserved (32位) |
+-------------------------------------------------------+
| address 1 |
+-------------------------------------------------------+
| address 2 |
+-------------------------------------------------------+
| ... |
+-------------------------------------------------------+
三个构建函数:
// 计算需要多少字节
socklen_t inet6_rth_space(int type, int segments);
// 初始化路由头缓冲区
void *inet6_rth_init(void *rthbuf, socklen_t rthlen, int type, int segments);
// 追加一个IPv6地址
int inet6_rth_add(void *rthbuf, const struct in6_addr *addr);
三个处理收到的路由头的函数:
// 反转路由(用于回程)
int inet6_rth_reverse(const void *in, void *out);
// 获取段数
int inet6_rth_segments(const void *rthbuf);
// 获取第i个地址
struct in6_addr *inet6_rth_getaddr(const void *rthbuf, int index);
27.7 IPv6 粘性选项(Sticky Options)
“粘性选项"就是"一次设置,每包生效”。与其每次 sendmsg 都带上辅助数据,不如用 setsockopt 设置一次:
// 用 setsockopt 设置粘性路由头(对所有后续包生效)
setsockopt(sockfd, IPPROTO_IPV6, IPV6_RTHDR, ptr, len);
// 如果某次发送需要覆盖,在 sendmsg 的辅助数据中指定即可
// 该次发送会用辅助数据中的选项,忽略粘性选项
七种可以设置为粘性选项的对象:
| 粘性选项常量 | 说明 |
|---|---|
| IPV6_PKTINFO | 目标地址 + 出接口 |
| IPV6_HOPLIMIT | 跳数限制 |
| IPV6_NEXTHOP | 下一跳地址 |
| IPV6_TCLASS | 流量类别 |
| IPV6_HOPOPTS | 逐跳选项 |
| IPV6_DSTOPTS | 目的地选项 |
| IPV6_RTHDR | 路由头 |
第28章 原始套接字(Raw Sockets)
28.1 为什么需要原始套接字?
普通的 TCP/UDP 套接字只能处理应用层数据,内核帮你处理下面的协议头。原始套接字让你直接操作网络层,像内核中的协议模块一样工作。
三大能力:
1. 读写 ICMPv4 / IGMPv4 / ICMPv6 包
→ ping 程序就是这样工作的
2. 读写内核不认识的 IP 协议包
→ OSPF 协议(协议号89)没有 TCP/UDP 封装,直接用 IP
→ gated 路由程序用此方式收发 OSPF 包
3. 自己构建 IPv4 头部(IP_HDRINCL 选项)
→ traceroute 早期版本用此方法设置 TTL
→ 可以构建任意 UDP/TCP 包用于测试
28.2 创建原始套接字
// 创建 IPv4 原始套接字(需要 root 权限)
int sockfd = socket(AF_INET, SOCK_RAW, IPPROTO_ICMP);
// ^地址族 ^套接字类型 ^协议(ICMP=1, IGMP=2, ...)
// 可选:让自己构建 IP 头部
const int on = 1;
setsockopt(sockfd, IPPROTO_IP, IP_HDRINCL, &on, sizeof(on));
注意: 只有超级用户(root)才能创建原始套接字。创建完之后应立即降权:
sockfd = socket(AF_INET, SOCK_RAW, IPPROTO_ICMP);
setuid(getuid()); // 从 root 降回普通用户
28.3 原始套接字发送规则
不设置 IP_HDRINCL(内核构建IP头)
你的数据: [ICMP头][ICMP数据]
内核加上: [IPv4头 20B][ICMP头][ICMP数据]
设置 IP_HDRINCL(你构建IP头)
你的数据: [IPv4头][载荷]
↓ 内核只修改:校验和 + 可能改ID字段
→ 你必须自己计算载荷的校验和(如 ICMP 校验和)
重要字节序差异:
| 系统 | ip_len, ip_off | 其他字段 |
|---|---|---|
| BSD 派生 | 主机字节序 | 网络字节序 |
| Linux / OpenBSD | 全部网络字节序 | 网络字节序 |
28.4 原始套接字接收规则
哪些包会被传给原始套接字?
不会传递:
✗ TCP 包(内核完全处理)
✗ UDP 包(内核完全处理)
会传递:
✓ 大多数 ICMP 包(除了 echo request、timestamp request、地址掩码请求这三种由内核自己回复的)
✓ 全部 IGMP 包
✓ 内核不认识协议号的 IP 包
✓ 已重组完成的分片包(分片期间不传)
匹配规则(三个条件全满足才投递):
投递条件={协议号匹配(若创建时指定了非0协议)目标IP匹配(若调用了 bind)源IP匹配(若调用了 connect) \text{投递条件} = \begin{cases} \text{协议号匹配(若创建时指定了非0协议)} \\ \text{目标IP匹配(若调用了 bind)} \\ \text{源IP匹配(若调用了 connect)} \end{cases} 投递条件=⎩
⎨
⎧协议号匹配(若创建时指定了非0协议)目标IP匹配(若调用了 bind)源IP匹配(若调用了 connect)
如果创建套接字时协议为 0 且没有 bind/connect,则收到所有原始包的副本。
ICMPv6 过滤器
ICMPv6 比 ICMPv4 消息更多(包含了ARP和IGMP的功能),为避免程序被大量无关包淹没,可以设置过滤器:
struct icmp6_filter myfilt;
// 先屏蔽全部
ICMP6_FILTER_SETBLOCKALL(&myfilt);
// 只放行路由器通告
ICMP6_FILTER_SETPASS(ND_ROUTER_ADVERT, &myfilt);
// 安装过滤器
setsockopt(fd, IPPROTO_ICMPV6, ICMP6_FILTER, &myfilt, sizeof(myfilt));
28.5 ping 程序实现
整体架构
ICMP 报文格式
0 7 8 15 16 31
+--------+---------+-------------------------------+
| type | code | checksum | 8字节固定头
+--------+---------+-------------------------------+
| identifier | sequence number |
+------------------+-------------------------------+
| optional data |
| (我们存放发送时间戳,用于计算RTT) |
+-----------------------------------------------+-+
RTT 计算公式:
RTT=Trecv−TsendRTT = T_{recv} - T_{send}RTT=Trecv−Tsend
换算为毫秒:
RTTms=RTTsec×1000+RTTμs1000RTT_{ms} = RTT_{sec} \times 1000 + \frac{RTT_{\mu s}}{1000}RTTms=RTTsec×1000+1000RTTμs
Internet 校验和算法
ICMPv4 需要应用程序自己计算校验和,ICMPv6 由内核计算(因为需要 IPv6 源地址参与计算)。
校验和算法(16位一补码和的一补码):
checksum=∼(∑iwordi)16位\text{checksum} = \sim \left( \sum_{i} \text{word}_{i} \right)_{16\text{位}}checksum=∼(i∑wordi)16位
(求和时进位回卷到低位)
完整 ping 程序(C++,支持 IPv4)
/*
* 简化版 ping 程序(仅 IPv4)
* 演示原始套接字 + ICMP 的使用
*
* 编译:g++ -o myping myping.cpp
* 运行(需要 root):sudo ./myping 8.8.8.8
*/
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <cerrno>
#include <csignal>
#include <unistd.h>
#include <sys/socket.h>
#include <sys/time.h>
#include <netinet/in.h>
#include <netinet/ip.h>
#include <netinet/ip_icmp.h>
#include <arpa/inet.h>
#include <netdb.h>
// ===== 全局变量 =====
static int g_sockfd; // 原始套接字描述符
static int g_nsent = 0; // 已发送包数
static pid_t g_pid; // 进程ID(放入 ICMP identifier 字段)
static struct sockaddr_in g_dest; // 目标地址
// ===== Internet 校验和计算 =====
/*
* 算法:把所有16位字相加(超过16位的进位加回低位),最后取反
* addr: 数据起始地址
* len: 字节数
*/
static uint16_t in_cksum(const uint16_t *addr, int len)
{
uint32_t sum = 0;
int nleft = len;
const uint16_t *w = addr;
uint16_t answer = 0;
// 每次累加2字节(16位)
while (nleft > 1) {
sum += *w++;
nleft -= 2;
}
// 如果长度是奇数,最后1字节补0处理
if (nleft == 1) {
*(unsigned char *)(&answer) = *(const unsigned char *)w;
sum += answer;
}
// 把高16位的进位加回到低16位(循环直到没有进位)
sum = (sum >> 16) + (sum & 0xffff);
sum += (sum >> 16);
// 取反得到最终校验和
return (uint16_t)(~sum);
}
// ===== 发送 ICMP Echo Request =====
static void send_ping(void)
{
// ICMP 包 = 8字节头 + 56字节数据(存时间戳 + 填充)
static char sendbuf[8 + 56];
struct icmp *icmp = (struct icmp *)sendbuf;
icmp->icmp_type = ICMP_ECHO; // 类型 8:回显请求
icmp->icmp_code = 0; // 代码固定为 0
icmp->icmp_id = (uint16_t)g_pid; // 用PID标识是哪个ping进程
icmp->icmp_seq = (uint16_t)g_nsent++; // 序列号,每发一包加1
// 填充数据区(先填模式,再存时间戳)
memset(icmp->icmp_data, 0xa5, 56);
gettimeofday((struct timeval *)icmp->icmp_data, NULL); // 存当前时间
int len = 8 + 56; // 总长度
// 计算校验和(先将校验和字段置0)
icmp->icmp_cksum = 0;
icmp->icmp_cksum = in_cksum((uint16_t *)icmp, len);
// 发送(内核会自动加上 IPv4 头)
ssize_t n = sendto(g_sockfd, sendbuf, len, 0,
(struct sockaddr *)&g_dest, sizeof(g_dest));
if (n < 0) {
perror("sendto");
}
}
// ===== SIGALRM 信号处理器:每秒发送一个ping =====
static void sig_alrm(int signo)
{
send_ping();
alarm(1); // 1秒后再次触发
(void)signo;
}
// ===== 减法:out -= in(struct timeval 相减)=====
static void tv_sub(struct timeval *out, const struct timeval *in)
{
if ((out->tv_usec -= in->tv_usec) < 0) {
--out->tv_sec;
out->tv_usec += 1000000;
}
out->tv_sec -= in->tv_sec;
}
// ===== 处理收到的 ICMP 包 =====
/*
* 收到的原始包格式:
* [IPv4头 20字节][ICMP头 8字节][ICMP数据]
* ptr 指向 IPv4 头起始
*/
static void proc_v4(const char *ptr, ssize_t len,
const struct sockaddr_in *from,
const struct timeval *tvrecv)
{
const struct ip *ip = (const struct ip *)ptr;
int hlen1 = ip->ip_hl << 2; // IPv4头长度 = ip_hl * 4
// 如果不是 ICMP 协议,忽略
if (ip->ip_p != IPPROTO_ICMP)
return;
const struct icmp *icmp = (const struct icmp *)(ptr + hlen1);
int icmplen = (int)(len - hlen1);
if (icmplen < 8) return; // 包太短
// 只处理回显应答(类型 0)
if (icmp->icmp_type == ICMP_ECHOREPLY) {
// 检查是否是我们发的(通过identifier区分)
if (icmp->icmp_id != (uint16_t)g_pid)
return; // 是别的 ping 进程的应答
if (icmplen < 8 + (int)sizeof(struct timeval))
return; // 数据区太短,无法取出时间戳
// 从 ICMP 数据区取出发送时间戳
struct timeval tvsend;
memcpy(&tvsend, icmp->icmp_data, sizeof(tvsend));
// RTT = 收到时刻 - 发送时刻
struct timeval tvdiff = *tvrecv;
tv_sub(&tvdiff, &tvsend);
double rtt = tvdiff.tv_sec * 1000.0 + tvdiff.tv_usec / 1000.0;
printf("%d bytes from %s: seq=%u, ttl=%d, rtt=%.3f ms\n",
icmplen,
inet_ntoa(from->sin_addr),
icmp->icmp_seq,
ip->ip_ttl,
rtt);
}
}
// ===== 主接收循环 =====
static void readloop(void)
{
char recvbuf[1500];
struct sockaddr_in from;
socklen_t fromlen;
struct timeval tvrecv;
// 发送第一个 ping(之后由 SIGALRM 每秒触发)
sig_alrm(SIGALRM);
// 无限循环,接收所有 ICMP 包
for (;;) {
fromlen = sizeof(from);
ssize_t n = recvfrom(g_sockfd, recvbuf, sizeof(recvbuf), 0,
(struct sockaddr *)&from, &fromlen);
if (n < 0) {
if (errno == EINTR) continue; // 被信号打断,继续
perror("recvfrom");
break;
}
// 记录收到时间
gettimeofday(&tvrecv, NULL);
// 处理 ICMP 包
proc_v4(recvbuf, n, &from, &tvrecv);
}
}
int main(int argc, char *argv[])
{
if (argc != 2) {
fprintf(stderr, "用法: %s <目标IP或主机名>\n", argv[0]);
return 1;
}
g_pid = getpid();
// 解析目标地址
struct addrinfo hints, *res;
memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_INET;
if (getaddrinfo(argv[1], NULL, &hints, &res) != 0) {
fprintf(stderr, "无法解析主机: %s\n", argv[1]);
return 1;
}
memcpy(&g_dest, res->ai_addr, sizeof(g_dest));
freeaddrinfo(res);
char ipstr[INET_ADDRSTRLEN];
inet_ntop(AF_INET, &g_dest.sin_addr, ipstr, sizeof(ipstr));
printf("PING %s (%s): 56 data bytes\n", argv[1], ipstr);
// 创建 ICMP 原始套接字(需要 root)
g_sockfd = socket(AF_INET, SOCK_RAW, IPPROTO_ICMP);
if (g_sockfd < 0) {
perror("socket(需要root权限)");
return 1;
}
// 创建成功后立即降权
setuid(getuid());
// 注册 SIGALRM 处理器
signal(SIGALRM, sig_alrm);
// 进入接收循环
readloop();
close(g_sockfd);
return 0;
}
28.6 traceroute 程序实现
工作原理
发包:UDP,目标端口随机(希望没有程序在监听)
第一轮(TTL=1):
发送方 --[TTL=1]--> 路由器A
路由器A 发现 TTL 变为 0,回 ICMP "time exceeded"
→ 得知第一跳是路由器A
第二轮(TTL=2):
发送方 --[TTL=2]--> 路由器A --[TTL=1]--> 路由器B
路由器B 回 ICMP "time exceeded"
→ 得知第二跳是路由器B
...以此类推,直到:
TTL 足够大,UDP 包到达目标
目标回 ICMP "port unreachable"(因为那个端口没有在监听)
→ 追踪完成!
用 ASCII 演示:
TTL=1: [我] ----> [路由器A] 返回 ICMP time exceeded
TTL=2: [我] ----> [路由器A] ----> [路由器B] 返回 ICMP time exceeded
TTL=3: [我] ----> [路由器A] ----> [路由器B] ----> [目标] 返回 ICMP port unreachable
核心:区分自己的包
多个 traceroute 可能同时运行,如何知道收到的 ICMP 错误是回应自己的包?
ICMP 错误消息里会包含原始 UDP 包的头部(至少 8 字节):
收到的ICMP错误包结构:
[IPv4头][ICMP错误头 8B][引发错误的IPv4头][引发错误的UDP头]
^
从这里检查源端口和目标端口
traceroute 用自己的 PID(的低16位 | 0x8000)作为 UDP 源端口,通过匹配源端口来认领自己的包。
recv_v4 返回值含义
| 返回值 | 含义 |
|---|---|
| -3 | 超时(3秒内没收到回复) |
| -2 | 收到 ICMP time exceeded(打了中间路由器) |
| -1 | 收到 ICMP port unreachable(到达目标!) |
| >= 0 | 其他 ICMP destination unreachable 代码 |
完整 traceroute 核心逻辑(C++)
/*
* 简化版 traceroute 核心逻辑演示(仅 IPv4)
* 编译:g++ -o mytracert mytracert.cpp
* 运行:sudo ./mytracert 8.8.8.8
*/
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <cerrno>
#include <csignal>
#include <unistd.h>
#include <sys/socket.h>
#include <sys/time.h>
#include <netinet/in.h>
#include <netinet/ip.h>
#include <netinet/ip_icmp.h>
#include <netinet/udp.h>
#include <arpa/inet.h>
#include <netdb.h>
// ===== 常量和全局变量 =====
#define MAX_TTL 30 // 最大跳数
#define NPROBES 3 // 每个TTL发几个探测包
#define DEST_PORT 33434 // 目标端口起始值(期望没人监听)
#define TIMEOUT_S 3 // 等待回复的超时秒数
static int g_recvfd; // 原始 ICMP 套接字(接收用)
static int g_sendfd; // UDP 套接字(发送探测包用)
static uint16_t g_sport; // 我们的 UDP 源端口(标识进程)
static volatile int g_alarm; // SIGALRM 标志
// Internet 校验和(同 ping 程序)
static uint16_t in_cksum(const uint16_t *addr, int len)
{
uint32_t sum = 0;
while (len > 1) { sum += *addr++; len -= 2; }
if (len) sum += *(const uint8_t *)addr;
sum = (sum >> 16) + (sum & 0xffff);
sum += (sum >> 16);
return (uint16_t)(~sum);
}
// SIGALRM 处理器:只设置标志,让 recvfrom 被 EINTR 打断
static void sig_alrm(int signo) { g_alarm = 1; (void)signo; }
/*
* recv_icmp: 等待并解析 ICMP 回复
* seq: 期望匹配的序列号(对应发送的第几个包)
* dport: 期望匹配的目标端口
* tvrecv: 输出参数,收到包的时间
*
* 返回:-3超时, -2中间路由器, -1到达目标, >=0其他错误码
*/
static int recv_icmp(int seq, uint16_t dport, struct timeval *tvrecv)
{
char buf[1500];
struct sockaddr_in from;
socklen_t fromlen;
ssize_t n;
g_alarm = 0;
alarm(TIMEOUT_S);
for (;;) {
if (g_alarm) return -3; // 超时
fromlen = sizeof(from);
n = recvfrom(g_recvfd, buf, sizeof(buf), 0,
(struct sockaddr *)&from, &fromlen);
if (n < 0) {
if (errno == EINTR) continue; // 被信号打断,检查超时标志
perror("recvfrom");
alarm(0);
return -3;
}
// 解析外层 IPv4 头
struct ip *ip = (struct ip *)buf;
int hlen1 = ip->ip_hl << 2;
// 找到 ICMP 头
struct icmp *icmp = (struct icmp *)(buf + hlen1);
int icmplen = (int)(n - hlen1);
if (icmplen < 8) continue;
// 处理 ICMP Time Exceeded(TTL超限)
if (icmp->icmp_type == ICMP_TIMXCEED &&
icmp->icmp_code == ICMP_TIMXCEED_INTRANS) {
if (icmplen < 8 + 20 + 8) continue;
// 解析内层IP头(引发错误的原始UDP包的IP头)
struct ip *hip = (struct ip *)(buf + hlen1 + 8);
int hlen2 = hip->ip_hl << 2;
// 解析内层UDP头
struct udphdr *udp = (struct udphdr *)(buf + hlen1 + 8 + hlen2);
// 检查源端口和目标端口是否匹配我们发的包
if (hip->ip_p == IPPROTO_UDP &&
udp->uh_sport == htons(g_sport) &&
udp->uh_dport == htons(dport + seq)) {
// 这是回应我们探测包的 time exceeded
// 打印发出ICMP的路由器IP
printf(" %s", inet_ntoa(from.sin_addr));
alarm(0);
gettimeofday(tvrecv, NULL);
return -2; // 打到中间路由器
}
}
// 处理 ICMP Destination Unreachable(目标不可达)
else if (icmp->icmp_type == ICMP_UNREACH) {
if (icmplen < 8 + 20 + 8) continue;
struct ip *hip = (struct ip *)(buf + hlen1 + 8);
int hlen2 = hip->ip_hl << 2;
struct udphdr *udp = (struct udphdr *)(buf + hlen1 + 8 + hlen2);
if (hip->ip_p == IPPROTO_UDP &&
udp->uh_sport == htons(g_sport) &&
udp->uh_dport == htons(dport + seq)) {
printf(" %s", inet_ntoa(from.sin_addr));
alarm(0);
gettimeofday(tvrecv, NULL);
if (icmp->icmp_code == ICMP_UNREACH_PORT)
return -1; // 到达目标(端口不可达)
else
return icmp->icmp_code; // 其他不可达原因
}
}
// 其他 ICMP 消息忽略,继续等待
}
}
int main(int argc, char *argv[])
{
if (argc != 2) {
fprintf(stderr, "用法: %s <目标IP或主机名>\n", argv[0]);
return 1;
}
// 解析目标地址
struct addrinfo hints, *res;
memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_INET;
if (getaddrinfo(argv[1], NULL, &hints, &res) != 0) {
fprintf(stderr, "无法解析: %s\n", argv[1]);
return 1;
}
struct sockaddr_in dest;
memcpy(&dest, res->ai_addr, sizeof(dest));
freeaddrinfo(res);
char ipstr[INET_ADDRSTRLEN];
inet_ntop(AF_INET, &dest.sin_addr, ipstr, sizeof(ipstr));
printf("traceroute to %s (%s): %d hops max\n", argv[1], ipstr, MAX_TTL);
// 创建原始 ICMP 套接字(接收用)
g_recvfd = socket(AF_INET, SOCK_RAW, IPPROTO_ICMP);
if (g_recvfd < 0) { perror("raw socket(需要root)"); return 1; }
// 创建 UDP 套接字(发送探测包用)
g_sendfd = socket(AF_INET, SOCK_DGRAM, 0);
if (g_sendfd < 0) { perror("udp socket"); return 1; }
// 降权
setuid(getuid());
// 绑定源端口(用低16位PID | 0x8000 作为标识)
g_sport = (uint16_t)((getpid() & 0x7fff) | 0x8000);
struct sockaddr_in src;
memset(&src, 0, sizeof(src));
src.sin_family = AF_INET;
src.sin_port = htons(g_sport);
bind(g_sendfd, (struct sockaddr *)&src, sizeof(src));
signal(SIGALRM, sig_alrm);
char sendbuf[64] = {};
int done = 0;
int seq = 0;
// 外层循环:TTL 从 1 增加到 MAX_TTL
for (int ttl = 1; ttl <= MAX_TTL && !done; ttl++) {
// 设置 UDP 发送套接字的 TTL
setsockopt(g_sendfd, IPPROTO_IP, IP_TTL, &ttl, sizeof(ttl));
printf("%2d", ttl);
fflush(stdout);
// 内层循环:对同一 TTL 发 NPROBES 个探测包
for (int probe = 0; probe < NPROBES; probe++) {
// 每个探测包用不同目标端口(DEST_PORT + seq)
struct timeval tvsend;
gettimeofday(&tvsend, NULL);
// 把发送时间存入 payload,方便计算 RTT
memcpy(sendbuf, &tvsend, sizeof(tvsend));
dest.sin_port = htons(DEST_PORT + seq);
sendto(g_sendfd, sendbuf, sizeof(tvsend), 0,
(struct sockaddr *)&dest, sizeof(dest));
struct timeval tvrecv;
int ret = recv_icmp(seq, DEST_PORT, &tvrecv);
seq++;
if (ret == -3) {
// 超时
printf(" *");
} else {
// 计算 RTT
struct timeval tsdiff = tvrecv;
if ((tsdiff.tv_usec -= tvsend.tv_usec) < 0) {
tsdiff.tv_sec--;
tsdiff.tv_usec += 1000000;
}
tsdiff.tv_sec -= tvsend.tv_sec;
double rtt = tsdiff.tv_sec * 1000.0 + tsdiff.tv_usec / 1000.0;
printf(" %.3f ms", rtt);
if (ret == -1) done = 1; // 到达目标
}
fflush(stdout);
}
printf("\n");
}
close(g_recvfd);
close(g_sendfd);
return 0;
}
28.7 ICMP 消息守护进程(icmpd)
问题背景
普通 UDP 套接字几乎收不到异步 ICMP 错误(比如"目标不可达")。只有 connect 了之后才能通过 recvfrom 的 errno 得到,而且一个套接字发给多个目标时根本不知道哪个目标出了错。
解决方案:ICMP 守护进程
架构:
icmpd 守护进程
├── 原始 ICMPv4 套接字(接收所有 ICMP)
├── 原始 ICMPv6 套接字
└── Unix 域流式套接字(/tmp/icmpd,等待客户端连接)
应用程序(如 UDP echo 客户端)
├── UDP 套接字(正常通信用)
└── Unix 域套接字(连接 icmpd,接收错误通知)
用 Mermaid 展示通信流程:
ICMP 错误到 errno 的映射
| ICMP 错误 | errno 值 | 含义 |
|---|---|---|
| port unreachable(IPv4/v6) | ECONNREFUSED | 目标端口无人监听 |
| fragmentation needed(IPv4) | EMSGSIZE | 需要分片但DF=1 |
| packet too big(IPv6) | EMSGSIZE | 数据包太大 |
| time exceeded | EHOSTUNREACH | TTL超限,可能路由环路 |
| source quench(仅IPv4) | EHOSTUNREACH | 路由器丢包(已废弃) |
| 其他不可达 | EHOSTUNREACH | 其他原因到达不了 |
icmpd_err 结构体
struct icmpd_err {
int icmpd_errno; // 映射后的errno值(EHOSTUNREACH等)
char icmpd_type; // 原始ICMP类型
char icmpd_code; // 原始ICMP代码
socklen_t icmpd_len; // 后面sockaddr的长度
struct sockaddr_storage icmpd_dest; // 引发错误的目标地址(含端口)
};
总结对比
| 特性 | IPv4 | IPv6 |
|---|---|---|
| 选项位置 | 紧跟IP头,最多40字节 | 独立扩展头,无固定大小限制 |
| 源路由接口 | 直接操作字节(IP_OPTIONS) | 函数接口(inet6_rth_*) |
| 程序需懂格式 | 是 | 否 |
| ICMP校验和 | 应用程序计算 | 内核计算(需源地址) |
| 原始套接字收到的数据 | 含IPv4头 | 不含IPv6头 |
| 字节序(IP_HDRINCL时) | BSD派生:部分主机序 | 全部网络字节序 |
openEuler 是由开放原子开源基金会孵化的全场景开源操作系统项目,面向数字基础设施四大核心场景(服务器、云计算、边缘计算、嵌入式),全面支持 ARM、x86、RISC-V、loongArch、PowerPC、SW-64 等多样性计算架构
更多推荐


所有评论(0)