zmq源码分析之signaler_t
·
文章目录
1. 设计目的
signaler_t 是 ZeroMQ 中的跨平台信号机制,用于:
- 线程间通知:不同线程之间的事件通知
- 事件触发:当有消息到达时通知用户
- 线程安全:确保信号的可靠传递
2. 核心实现
2.1 成员变量
class signaler_t
{
private:
// 写入端和读取端文件描述符
fd_t _w; // 写入端
fd_t _r; // 读取端
#ifdef HAVE_FORK
pid_t pid; // 进程ID,用于检测fork
#endif
};
2.2 构造函数
zmq::signaler_t::signaler_t ()
{
// 创建文件描述符对
if (make_fdpair (&_r, &_w) == 0) {
unblock_socket (_w); // 设置为非阻塞
unblock_socket (_r); // 设置为非阻塞
}
#ifdef HAVE_FORK
pid = getpid (); // 记录进程ID
#endif
}
2.3 发送信号
void zmq::signaler_t::send ()
{
#ifdef HAVE_FORK
if (unlikely (pid != getpid ())) {
return; // 子进程不发送信号
}
#endif
#if defined ZMQ_HAVE_EVENTFD
const uint64_t inc = 1;
ssize_t sz = write (_w, &inc, sizeof (inc));
errno_assert (sz == sizeof (inc));
#elif defined ZMQ_HAVE_WINDOWS
const char dummy = 0;
int nbytes;
do {
nbytes = ::send (_w, &dummy, sizeof (dummy), 0);
wsa_assert (nbytes != SOCKET_ERROR);
} while (nbytes == SOCKET_ERROR);
zmq_assert (nbytes == sizeof (dummy));
#else
unsigned char dummy = 0;
while (true) {
ssize_t nbytes = ::send (_w, &dummy, sizeof (dummy), 0);
if (unlikely (nbytes == -1 && errno == EINTR))
continue;
zmq_assert (nbytes == sizeof dummy);
break;
}
#endif
}
2.4 接收信号
void zmq::signaler_t::recv ()
{
#if defined ZMQ_HAVE_EVENTFD
uint64_t dummy;
ssize_t sz = read (_r, &dummy, sizeof (dummy));
errno_assert (sz == sizeof (dummy));
// 处理多个信号的情况
if (unlikely (dummy > 1)) {
const uint64_t inc = dummy - 1;
ssize_t sz2 = write (_w, &inc, sizeof (inc));
errno_assert (sz2 == sizeof (inc));
return;
}
zmq_assert (dummy == 1);
#else
unsigned char dummy;
// 读取一个字节
ssize_t nbytes = ::recv (_r, &dummy, sizeof (dummy), 0);
errno_assert (nbytes >= 0);
zmq_assert (nbytes == sizeof (dummy));
zmq_assert (dummy == 0);
#endif
}
2.5 等待信号
int zmq::signaler_t::wait (int timeout_) const
{
#ifdef HAVE_FORK
if (unlikely (pid != getpid ())) {
errno = EINTR;
return -1;
}
#endif
#ifdef ZMQ_POLL_BASED_ON_POLL
struct pollfd pfd;
pfd.fd = _r;
pfd.events = POLLIN;
const int rc = poll (&pfd, 1, timeout_);
if (unlikely (rc < 0)) {
errno_assert (errno == EINTR);
return -1;
}
if (unlikely (rc == 0)) {
errno = EAGAIN;
return -1;
}
zmq_assert (rc == 1);
zmq_assert (pfd.revents & POLLIN);
return 0;
#elif defined ZMQ_POLL_BASED_ON_SELECT
// 使用select实现
// ...
#endif
}
3. 跨平台实现
3.1 Unix 平台
- 使用 socketpair:创建一对相互连接的 socket
- send/recv:通过 socket 发送/接收信号
- poll/select:等待信号
3.2 Windows 平台
- 使用 socket:创建 TCP socket
- WSA 函数:使用 Windows 套接字 API
- select:等待信号
3.3 特殊平台
- eventfd:部分平台支持 eventfd,更高效
- VXWORKS:特殊处理 VXWORKS 平台
4. 技术要点
4.1 非阻塞设计
- unblock_socket:设置文件描述符为非阻塞
- 错误处理:处理 EINTR、EAGAIN 等错误
- 重试机制:在遇到中断时重试
4.2 线程安全
- 无锁设计:信号机制本身是线程安全的
- 原子操作:eventfd 使用原子操作
- fork 处理:在子进程中重新创建文件描述符对
4.3 性能优化
- 最小数据:只发送一个字节,最小化开销
- 批量处理:eventfd 支持批量信号
- 非阻塞:避免阻塞线程
4.4 错误处理
- 健壮性:处理各种错误情况
- 断言检查:使用 zmq_assert 确保操作成功
- 资源管理:正确关闭文件描述符
5. 使用场景
5.1 线程安全 Socket
线程安全的 socket 使用 signaler:
- PUB、SUB、DEALER、ROUTER 等线程安全的 socket
- 当有消息到达时,通过 signaler 通知用户
5.2 多线程通信
不同线程之间的通知:
- IO 线程通知应用线程
- 工作线程之间的协调
- 事件触发机制
5.3 事件轮询
与 poll/select 配合:
- 将 signaler 的文件描述符加入 poll 集合
- 当有信号时,poll 会返回
- 然后检查具体的事件
7. 总结
signaler_t 是 ZeroMQ 中高效、跨平台的信号机制:
-
设计简洁:
- 只发送一个字节,最小化开销
- 跨平台实现,支持不同操作系统
-
功能强大:
- 支持线程间通知
- 与 poll/select 无缝集成
- 处理 fork 等特殊情况
-
性能高效:
- 非阻塞设计
- 最小化系统调用
- 批量处理能力
-
可靠性:
- 健壮的错误处理
- 线程安全
- 资源管理
这种设计使得 ZeroMQ 能够高效地处理线程间通信,是其高性能的重要基础。
openEuler 是由开放原子开源基金会孵化的全场景开源操作系统项目,面向数字基础设施四大核心场景(服务器、云计算、边缘计算、嵌入式),全面支持 ARM、x86、RISC-V、loongArch、PowerPC、SW-64 等多样性计算架构
更多推荐

所有评论(0)