一、背景介绍
在上一篇博客中,详细介绍了多路复用的select/poll/epoll算法,在学完epoll,我们已经能让一个线程同时盯住一百万个 fd,并在数据到达时立刻醒来;可翻开代码,我们依旧面临一堆“体力活”:
1. epoll_wait 返回后,一大坨 if/else 判断谁就绪;
2. 拼包、粘包、写不全的缓冲区要自己挪来挪去;
3. 加新协议?主循环又得继续“垒烟囱”;
4. 想从单线程改成“主从”或“线程池”?所有逻辑推倒重来。
即epoll只解决了“通知”的问题,却没有回答“通知来了以后,代码怎么长得好看、好维护、好扩展。于是人们把“通知”这件事再抽象一层,把事件分发、缓冲区管理、生命周期、多线程模型统统封装起来——这就是 Reactor(反应器)模式。它并不替代 epoll,而是站在 epoll 肩上,本文将详细介绍“reactor”框架。
二、reactor原理
2.1 基本定义
reactor是一种把I/O事件统一侦测、分派给用户回调执行,且线程绝不阻塞在 I/O 上的同步事件驱动设计模式。简单来说就是"事件到来 → 通知 → 回调" 的同步非阻塞网络模型。这里有四个关键词:
-
同步 - 读写操作仍在发起线程完成,不借助异步 I/O 内核完成通知
-
事件驱动 - 只关注"就绪/可写/错误"事件,事件到才回调
-
统一侦测 - 用 select/poll/epoll 等一次性收割所有 fd 事件
-
绝不阻塞 - 唯一可能阻塞的点是事件等待;读写本身非阻塞
Reactor 不是库,也不是框架,而是**"等待-分发-回调"循环**的抽象范式;任何语言、任何系统调用,只要满足上述四条,就称为 Reactor 实现。
2.2 四大核心角色
2.2.1 Handle事件源
-
定义:任何能产生事件的“东西”,多数时候它就是那个小小的int fd,但背后藏着一整套“从网卡到进程”的完整事件生产链。
-
作用:
-
给内核( Demultiplexer)提供“监视靶子”
-
给应用(Dispatcher) 提供“身份令牌”(通常就是 fd 值或指针)
-
-
生命周期:创建 → 注册到 Demultiplexer → 事件到达 → 分派 → 处理 → 关闭。
2.2.2 Synchronous Event Demultiplexer(同步事件分拣器)
Synchronous Event Demultiplexer(同步事件分离器)是 Reactor 模型中真正“让线程睡觉、又负责叫醒”的那一层。它并不处理业务,只干三件事:注册兴趣、阻塞等待、返回就绪列表。简单来说,它的职责就是:“线程你把要盯的 fd 交给我,然后睡觉;有活干了我叫醒你,还把就绪清单放桌上。”在 Linux 上,epoll 就是 Synchronous Event Demultiplexer 的事实标准实现。实现过程如下:
-
创建实例
int epfd = epoll_create1(0);实现初始化 就绪链表rdllist和等待队列wq。
-
注册兴趣
struct epoll_event ev = { .events = EPOLLIN, .data.ptr = conn }; epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev);内核为每个 fd 生成一个 struct epitem(epoll 的内核私有结构体),并把 epitem.rdllink (就绪链表节点)挂到 rdllist(若已就绪),同时把 epitem.pwqlist (私有等待队列) 与 socket 等待队列wq勾连。
-
阻塞等待
int n = epoll_wait(epfd, events, MAX, -1);检查rdllist是否为空,若为空 ,设置 TASK_INTERRUPYOBBLE 并 调度出去(CPU 0%)若rdllist 非空 则立即拷贝就绪项到用户空间并返回
-
数据到达唤醒链(以 TCP 可读为例)
网卡 → 软中断 → tcp_rcv() → sock_def_readable() → ep_poll_callback() → 把 epitem 插入 rdllist → __wake_up(&ep->wq) → 用户态线程恢复数据到达→socket 队列非空→ep_poll_callback 把 epitem 挂到就绪链表并唤醒线程,于是 epoll_wait 立即返回,用户回调被执行。
2.2.3 Dispatcher(分派器)——Reactor 的“事件调度总台”
分派器就是把 Synchronous Event Demultiplexer 返回的就绪事件列表,按 Handle 为单位路由到对应 Event Handler 的函数指针,并保证 O(1) 或 O(就绪数) 复杂度且不阻塞的纯路由层。简单来说就是把内核返回的就绪事件‘快递包裹’,按门牌号(fd)瞬间投送到正确的收件人(回调函数),保证整个 Reactor 循环既不迷路、也不堵车。示例如下:
for (i = 0; i < nready; i++) {
int fd = events[i].data.fd;
if (events[i].events & EPOLLIN)
conn_list[fd].r_action.recv_callback(fd);
if (events[i].events & EPOLLOUT)
conn_list[fd].send_callback(fd);
}
2.2.4 Event Handler(事件处理器)——Reactor 里真正“干活的人”
事件处理器就是将被 Dispatcher 调用的用户态函数/对象,只在 I/O 就绪时运行,必须非阻塞地完成业务逻辑、状态迁移和输出产生。其核心职责分为以下几个部分:
-
拼包 - 解决粘包/半包,把字节流拼成完整消息
-
算账 - 解码、校验、业务逻辑、状态机推进
-
写回 - 把响应写入缓冲区,注册可写事件
-
保状态 - 写不完时保留偏移,下次继续;事件兴趣按需翻转
Reactor 四大组件按事件驱动流水线耦合,协作关系如下:
-
Handle
向 Synchronous Event Demultiplexer 注册就绪兴趣;事件到达时内核将其插入就绪队列,成为后续分派的唯一身份令牌。 -
Synchronous Event Demultiplexer
阻塞等待内核就绪通知;返回时保证 rdllist 仅包含当前周期内状态变化的 Handle,复杂度 O(1) 于就绪数,零拷贝导出至用户空间。 -
Dispatcher
遍历就绪列表,按 Handle 索引调用对应 Event Handler 函数指针;自身无业务逻辑、无阻塞、无锁,路由复杂度 O(就绪数)。 -
Event Handler
执行非阻塞 I/O 与业务计算;通过修改缓冲区及事件掩码产生副作用,触发下一次兴趣注册,形成闭环状态机。
三、代码示例
本次示例的完整代码如下所示:
#include <errno.h>
#include <stdio.h>
#include <sys/socket.h>
#include <***i***/in.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>
#include <poll.h>
#include <sys/epoll.h>
#include <errno.h>
#include <sys/time.h>
#include "server.h"
#define CONNECTION_SIZE 1048576 // 1024 * 1024
#define MAX_PORTS 20
#define TIME_SUB_MS(tv1, tv2) ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000)
int a***ept_cb(int fd);
int recv_cb(int fd);
int send_cb(int fd);
int epfd = 0;
struct timeval begin;
//Handle事件源
struct conn conn_list[CONNECTION_SIZE] = {0};
// fd
int set_event(int fd, int event, int flag) {
if (flag) { // non-zero add
struct epoll_event ev;
ev.events = event;
ev.data.fd = fd;
epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev);
} else { // zero mod
struct epoll_event ev;
ev.events = event;
ev.data.fd = fd;
epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev);
}
}
int event_register(int fd, int event) {
if (fd < 0) return -1;
conn_list[fd].fd = fd;
conn_list[fd].r_action.recv_callback = recv_cb;
conn_list[fd].send_callback = send_cb;
memset(conn_list[fd].rbuffer, 0, BUFFER_LENGTH);
conn_list[fd].rlength = 0;
memset(conn_list[fd].wbuffer, 0, BUFFER_LENGTH);
conn_list[fd].wlength = 0;
set_event(fd, event, 1);
}
// listenfd(sockfd) --> EPOLLIN --> a***ept_cb
int a***ept_cb(int fd) {
struct sockaddr_in clientaddr;
socklen_t len = sizeof(clientaddr);
int clientfd = a***ept(fd, (struct sockaddr*)&clientaddr, &len);
//printf("a***ept finshed: %d\n", clientfd);
if (clientfd < 0) {
printf("a***ept errno: %d --> %s\n", errno, strerror(errno));
return -1;
}
event_register(clientfd, EPOLLIN); // | EPOLLET
if ((clientfd % 1000) == 0) {
struct timeval current;
gettimeofday(¤t, NULL);
int time_used = TIME_SUB_MS(current, begin);
memcpy(&begin, ¤t, sizeof(struct timeval));
printf("a***ept finshed: %d, time_used: %d\n", clientfd, time_used);
}
return 0;
}
int recv_cb(int fd) {
memset(conn_list[fd].rbuffer, 0, BUFFER_LENGTH );
int count = recv(fd, conn_list[fd].rbuffer, BUFFER_LENGTH, 0);
if (count == 0) { // disconnect
printf("client disconnect: %d\n", fd);
close(fd);
epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL); // unfinished
return 0;
} else if (count < 0) { //
printf("count: %d, errno: %d, %s\n", count, errno, strerror(errno));
close(fd);
epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
return 0;
}
conn_list[fd].rlength = count;
//printf("RECV: %s\n", conn_list[fd].rbuffer);
#if 0 // echo
conn_list[fd].wlength = conn_list[fd].rlength;
memcpy(conn_list[fd].wbuffer, conn_list[fd].rbuffer, conn_list[fd].wlength);
printf("[%d]RECV: %s\n", conn_list[fd].rlength, conn_list[fd].rbuffer);
#elif 0
http_request(&conn_list[fd]);
#else
ws_request(&conn_list[fd]);
#endif
set_event(fd, EPOLLOUT, 0);
return count;
}
int send_cb(int fd) {
#if 0
http_response(&conn_list[fd]);
#else
ws_response(&conn_list[fd]);
#endif
int count = 0;
#if 0
if (conn_list[fd].status == 1) {
//printf("SEND: %s\n", conn_list[fd].wbuffer);
count = send(fd, conn_list[fd].wbuffer, conn_list[fd].wlength, 0);
set_event(fd, EPOLLOUT, 0);
} else if (conn_list[fd].status == 2) {
set_event(fd, EPOLLOUT, 0);
} else if (conn_list[fd].status == 0) {
if (conn_list[fd].wlength != 0) {
count = send(fd, conn_list[fd].wbuffer, conn_list[fd].wlength, 0);
}
set_event(fd, EPOLLIN, 0);
}
#else
if (conn_list[fd].wlength != 0) {
count = send(fd, conn_list[fd].wbuffer, conn_list[fd].wlength, 0);
}
set_event(fd, EPOLLIN, 0);
#endif
//set_event(fd, EPOLLOUT, 0);
return count;
}
int init_server(unsigned short port) {
int sockfd = socket(AF_I***, SOCK_STREAM, 0);
struct sockaddr_in servaddr;
servaddr.sin_family = AF_I***;
servaddr.sin_addr.s_addr = htonl(INADDR_ANY); // 0.0.0.0
servaddr.sin_port = htons(port); // 0-1023,
if (-1 == bind(sockfd, (struct sockaddr*)&servaddr, sizeof(struct sockaddr))) {
printf("bind failed: %s\n", strerror(errno));
}
listen(sockfd, 10);
//printf("listen finshed: %d\n", sockfd); // 3
return sockfd;
}
int main() {
unsigned short port = 2000;
epfd = epoll_create(1);
int i = 0;
for (i = 0;i < MAX_PORTS;i ++) {
int sockfd = init_server(port + i);
conn_list[sockfd].fd = sockfd;
conn_list[sockfd].r_action.recv_callback = a***ept_cb;
set_event(sockfd, EPOLLIN, 1);
}
gettimeofday(&begin, NULL);
while (1) { // mainloop
struct epoll_event events[1024] = {0};
int nready = epoll_wait(epfd, events, 1024, -1);
int i = 0;
for (i = 0;i < nready;i ++) {
int connfd = events[i].data.fd;
#if 0
if (events[i].events & EPOLLIN) {
conn_list[connfd].r_action.recv_callback(connfd);
} else if (events[i].events & EPOLLOUT) {
conn_list[connfd].send_callback(connfd);
}
#else
if (events[i].events & EPOLLIN) {
conn_list[connfd].r_action.recv_callback(connfd);
}
if (events[i].events & EPOLLOUT) {
conn_list[connfd].send_callback(connfd);
}
#endif
}
}
}
在这段代码中体现了reactor的四大角色在代码中分别体现在:
| 角色 | 起始行号 | 关键符号 / 调用 | 功能描述 |
|---|---|---|---|
| Handle | ① 22 |
conn_list[fd] 下标即 fd |
事件源 = fd,O(1) 寻址 |
| Synchronous Event Demultiplexer | ② 39 | epoll_create(1) |
创建内核事件表 |
| ② 40-49 | set_event() |
epoll_ctl(ADD/MOD) 注册兴趣 |
|
| 251 | epoll_wait(..., -1) |
阻塞等待就绪清单 | |
| Dispatcher | ③ 257-267 | for (i = 0; i < nready; i++) |
按就绪掩码分派到回调指针 |
| Event Handler | ④ 71 | a***ept_cb() |
新连接事件处理器 |
| ④ 98 | recv_cb() |
读就绪事件处理器 | |
| ④ 140 | send_cb() |
写就绪事件处理器 |
更多知识详情可以查看此链接:https://github.***/0voice