基于 epoll 的 Reactor 模式实现:从原理到百万并发实践
1. 引言
在现代网络编程中,高并发处理能力是衡量服务器性能的重要指标。传统的阻塞 I/O 模型在面对海量连接时显得力不从心,而 I/O 多路复用技术则为我们提供了高效的解决方案。本文将深入探讨如何使用 epoll 实现 Reactor 模式,并最终达到百万并发的处理能力。
1.1 什么是 Reactor 模式?
Reactor 模式是一种事件驱动的设计模式,专门用于处理多个并发服务请求。其核心思想是将每个服务请求分离,然后分派给相应的事件处理器。
每一个io事件对应不同的回调函数,由io管理到io事件处理。Reactor 翻译过来的意思是「反应堆」,这里的反应指的是「对事件反应」,也就是来了一个事件,Reactor 就有相对应的反应/响应。
2. epoll 机制深度解析
2.1 epoll 基本原理与性能分析
-
epoll 是 Linux 下高效的 I/O 多路复用接口,通过
epoll_create、epoll_ctl、epoll_wait三个系统调用管理。 - 内部实现(简化视角):
- 使用红黑树 (rb-tree) 存储所有被监控的文件描述符 (FD)。
- 当事件就绪后,把就绪 FD 加入一个 “就绪链表 (ready list)”。
-
epoll_wait返回时,只遍历这个链表,而不是遍历所有 FD,从而实现事件通知效率优化。
- 时间复杂度上,在多数就绪情况下可以近似 O(1),而不是
select/poll那种 O(n) 全遍历方式 - 触发模式:
- 水平触发 (Level-Triggered, LT):只要缓冲区中还没读/写完,就会一直通知。
- 边缘触发 (Edge-Triggered, ET):只有状态变化 (例如新数据到达) 时才通知。ET 效率高,但对程序的非阻塞性和缓冲管理要求更高。
3. Reactor 实现详解
- Reactor 核心 (Event Loop)
-
A***eptor / Liste
ner:负责 a***ept 新连接 - Connection / Handler:每个连接一个 handler,处理读写逻辑
- 回调机制:注册回调函数 (on_read, on_write, on_close)
这种分层设计,契合许多高性能框架的实践。
3.1 核心数据结构设计
struct conn {
int fd; // 文件描述符
char rbuffer[BUFFER_SIZE]; // 读缓冲区
int rlength; // 读数据长度
char wbuffer[BUFFER_SIZE]; // 写缓冲区
int wlength; // 写数据长度
CALLBACK send_callback; // 发送回调函数指针
union {
CALLBACK a***ept_callback; // 接受连接回调
CALLBACK recv_callback; // 接收数据回调
} r_ation; // 读操作回调
};
亮点:
- 使用数组
conn_list[fd],可 O(1) 访问连接对象 - 回调采用函数指针 + union,简化结构
- 读写缓冲分离,便于扩展业务逻辑
3.2 回调机制:Reactor 的灵魂
3.2.1 初始化服务器
文件:init_server()
int init_server(unsigned short port){
int sockfd = socket(AF_I***, SOCK_STREAM, 0); // 监听套接字
struct sockaddr_in servaddr;
servaddr.sin_family = AF_I***;
servaddr.sin_port = htons(2000);
servaddr.sin_addr.s_addr = htonl(INADDR_ANY); // 0.0.0.0
// 绑定失败会返回-1
if(bind(sockfd, (struct sockaddr *)&servaddr, sizeof(servaddr)) == -1) {
printf("bind failed : %s\n", strerror(errno));
close(sockfd);
return 1;
}
if(listen(sockfd, 5) == -1) {
printf("listen failed : %s\n", strerror(errno));
close(sockfd);
return 1;
}
printf("listen finshed!\n");
return sockfd;
}
亮点:
- 多端口监听,为百万并发准备(避免单端口高 backlog 压力)
- listen backlog 虽然是
5,但内核一般允许更大的队列(可调优)
set_event(fd, EPOLLIN, 1);
参数含义:
| 参数 | 含义 |
|---|---|
event |
监控事件,如 EPOLLIN |
flag=1 |
新增事件 |
flag=0 |
修改事件 |
3.2.3 a***ept 回调
// 接受连接回调
int a***ept_cb(int fd) {
struct sockaddr_in clientaddr;
socklen_t len = sizeof(clientaddr);
int clientfd = a***ept(fd, (struct sockaddr *)&clientaddr, &len);
if(clientfd < 0) {
printf("a***ept errno : %d --> %s\n", errno, strerror(errno));
return -1;
}
register_event(clientfd, EPOLLIN); // 注册新连接的读事件
return clientfd;
}
特点:
- 非常适合测试百万连接的接入速度
- 连接对象初始化在
register_event()中完成
3.2.4 recv 回调
int recv_cb(int fd){
memset(conn_list[fd].rbuffer, 0, BUFFER_SIZE);
int count = recv(fd, conn_list[fd].rbuffer, conn_list[fd].rlength, 0);
if(count <= 0){
// 处理连接关闭或错误
close(fd);
epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
return -1;
}
conn_list[fd].rlength = count;
// 数据处理:简单回显
conn_list[fd].wlength = conn_list[fd].rlength;
memcpy(conn_list[fd].wbuffer, conn_list[fd].rbuffer, conn_list[fd].wlength);
set_event(fd, EPOLLOUT, 0); // 修改为监听写事件
return count;
}
亮点:
- 完整处理异常和客户端主动断开
- 简单的 echo 逻辑
- 事件从 EPOLLIN 修改为 EPOLLOUT(经典状态机)
3.2.5 send 回调
// 发送数据回调
int send_cb(int fd) {
int count = send(fd, conn_list[fd].wbuffer, conn_list[fd].wlength, 0);
set_event(fd, EPOLLIN, 0); // 发送完成后重新监听读事件
return count;
}
当我们监听到连接时即listenfd是EPOLLIN,我们执行a***ept_cb(),当clientfd的事件是EPOLLIN时,我们执行recv_cb(),将事件切回EPOLLOUT,关注写事件,当clientfd的事件是EPOLLOUT,我们将事件切回EPOLLIN,关注读事件。
3.3 事件循环:Reactor 的心脏
int main() {
// 初始化 epoll 和多端口监听
epfd = epoll_create(1);
for(int i = 0; i < MAX_PORTS; i++){
int sockfd = init_server(port + i);
conn_list[sockfd].fd = sockfd;
conn_list[sockfd].r_ation.a***ept_callback = a***ept_cb;
set_event(sockfd, EPOLLIN, 1);
}
// 主事件循环
while (1) {
struct epoll_event events[1024] = {0};
int nready = epoll_wait(epfd, events, 1024, -1);
for(int i = 0; i < nready; i++){
int connfd = events[i].data.fd;
if(events[i].events & EPOLLIN){
conn_list[connfd].r_ation.recv_callback(connfd);
}
if(events[i].events & EPOLLOUT){
conn_list[connfd].send_callback(connfd);
}
}
}
return 0;
}
这是 Reactor 的核心:事件分发 + 对应回调处理
完整代码:
#include <errno.h>
#include <stdio.h>
#include <sys/socket.h>
#include <***i***/in.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>
#include <poll.h>
#include <sys/epoll.h>
#include <sys/select.h>
// 实现百万并发
#define BUFFER_SIZE 1024
#define CONNECTION_SIZE 1048576 // 1024 * 1024
#define MAX_PORTS 20
#define TIME_SUB_MS(tv1, tv2) ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000)
typedef int (* CALLBACK) (int fd); // 函数指针类型定义
struct conn {
int fd;
char rbuffer[BUFFER_SIZE];
int rlength;
char wbuffer[BUFFER_SIZE];
int wlength;
CALLBACK send_callback;
union
{
CALLBACK a***ept_callback;
CALLBACK recv_callback;
} r_ation;
};
int epfd = 0; // epoll文件描述符 需要初始化
struct timeval begin = {0, 0}; // 记录时间
struct conn conn_list[CONNECTION_SIZE] = {0};
int init_server(unsigned short port);
int set_event(int fd, int event, int flag);
int register_event(int fd, int event);
int a***ept_cb(int fd);
int recv_cb(int fd);
int send_cb(int fd);
int init_server(unsigned short port){
int sockfd = socket(AF_I***, SOCK_STREAM, 0); // 监听套接字
struct sockaddr_in servaddr;
servaddr.sin_family = AF_I***;
servaddr.sin_port = htons(2000);
servaddr.sin_addr.s_addr = htonl(INADDR_ANY); // 0.0.0.0
// 绑定失败会返回-1
if(bind(sockfd, (struct sockaddr *)&servaddr, sizeof(servaddr)) == -1) {
printf("bind failed : %s\n", strerror(errno));
close(sockfd);
return 1;
}
if(listen(sockfd, 5) == -1) {
printf("listen failed : %s\n", strerror(errno));
close(sockfd);
return 1;
}
printf("listen finshed!\n");
return sockfd;
}
int set_event(int fd, int event, int flag) {
if(flag) { // non-zero add
struct epoll_event ev;
ev.data.fd = fd;
ev.events = event;
epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev);
}
else { // zero mod
struct epoll_event ev;
ev.data.fd = fd;
ev.events = event;
epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev);
}
}
int register_event(int fd, int event) {
if(fd < 0) return -1;
conn_list[fd].fd = fd;
conn_list[fd].r_ation.recv_callback = recv_cb;
conn_list[fd].send_callback = send_cb;
memset(conn_list[fd].rbuffer, 0, BUFFER_SIZE);
conn_list[fd].rlength = BUFFER_SIZE;
memset(conn_list[fd].wbuffer, 0, BUFFER_SIZE);
conn_list[fd].wlength = BUFFER_SIZE;
set_event(fd, event, 1);
}
int a***ept_cb(int fd) {
struct sockaddr_in clientaddr;
socklen_t len = sizeof(clientaddr);
int clientfd = a***ept(fd, (struct sockaddr *)&clientaddr, &len);
// printf("a***ept finished : %d\n", clientfd);
if(clientfd < 0) {
printf("a***ept errno : %d --> %s\n", errno, strerror(errno));
return -1;
}
register_event(clientfd, EPOLLIN);
if((clientfd % 1000) == 0) { // 每接受1000个连接打印一次
struct timeval now;
gettimeofday(&now, NULL);
int time_used = TIME_SUB_MS(now, begin);
memcpy(&begin, &now, sizeof(struct timeval));
printf("a***ept finshed: %d, time_used: %d\n", clientfd, time_used);
}
return clientfd;
}
int recv_cb(int fd){
memset(conn_list[fd].rbuffer, 0, BUFFER_SIZE);
int count = recv(fd, conn_list[fd].rbuffer, conn_list[fd].rlength, 0);
if(count < 0){
printf("count: %d, errno: %d, %s\n", count, errno, strerror(errno));
close(fd);
epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
return -1;
}
if(count == 0){
printf("client disconnect: %d\n", fd);
close(fd);
epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
return -1;
}
conn_list[fd].rlength = count;
// printf("recv: %s\n", conn_list[fd].rbuffer);
// ------------
conn_list[fd].wlength = conn_list[fd].rlength;
memcpy(conn_list[fd].wbuffer, conn_list[fd].rbuffer, conn_list[fd].wlength);
// ------------
set_event(fd, EPOLLOUT, 0);
return count;
}
int send_cb(int fd) {
int count = send(fd, conn_list[fd].wbuffer, conn_list[fd].wlength, 0);
// printf("SEND: %s\n", conn_list[fd].wbuffer);
set_event(fd, EPOLLIN, 0);
return count;
}
int main() {
unsigned short port = 2000;
epfd = epoll_create(1); // 创建epoll实例
for(int i = 0; i < MAX_PORTS; i++){
int sockfd = init_server(port + i);
conn_list[sockfd].fd = sockfd;
conn_list[sockfd].r_ation.a***ept_callback = a***ept_cb;
set_event(sockfd, EPOLLIN, 1);
}
gettimeofday(&begin, NULL); // 记录起始时间
while (1) {
struct epoll_event events[1024] = {0};
int nready = epoll_wait(epfd, events, 1024, -1);
for(int i = 0; i < nready; i++){
int connfd = events[i].data.fd;
if(events[i].events & EPOLLIN){
conn_list[connfd].r_ation.recv_callback(connfd);
}
if(events[i].events & EPOLLOUT){
conn_list[connfd].send_callback(connfd);
}
}
}
return 0;
}