基于 epoll 的 Reactor 模式实现:从原理到百万并发实践

基于 epoll 的 Reactor 模式实现:从原理到百万并发实践

基于 epoll 的 Reactor 模式实现:从原理到百万并发实践

1. 引言

在现代网络编程中,高并发处理能力是衡量服务器性能的重要指标。传统的阻塞 I/O 模型在面对海量连接时显得力不从心,而 I/O 多路复用技术则为我们提供了高效的解决方案。本文将深入探讨如何使用 epoll 实现 Reactor 模式,并最终达到百万并发的处理能力。

1.1 什么是 Reactor 模式?

Reactor 模式是一种事件驱动的设计模式,专门用于处理多个并发服务请求。其核心思想是将每个服务请求分离,然后分派给相应的事件处理器

每一个io事件对应不同的回调函数,由io管理到io事件处理。Reactor 翻译过来的意思是「反应堆」,这里的反应指的是「对事件反应」,也就是来了一个事件,Reactor 就有相对应的反应/响应

2. epoll 机制深度解析

2.1 epoll 基本原理与性能分析

  • epoll 是 Linux 下高效的 I/O 多路复用接口,通过 epoll_createepoll_ctlepoll_wait 三个系统调用管理。
  • 内部实现(简化视角):
    • 使用红黑树 (rb-tree) 存储所有被监控的文件描述符 (FD)。
    • 当事件就绪后,把就绪 FD 加入一个 “就绪链表 (ready list)”。
    • epoll_wait 返回时,只遍历这个链表,而不是遍历所有 FD,从而实现事件通知效率优化。
  • 时间复杂度上,在多数就绪情况下可以近似 O(1),而不是 select/poll 那种 O(n) 全遍历方式
  • 触发模式:
    • 水平触发 (Level-Triggered, LT):只要缓冲区中还没读/写完,就会一直通知。
    • 边缘触发 (Edge-Triggered, ET):只有状态变化 (例如新数据到达) 时才通知。ET 效率高,但对程序的非阻塞性和缓冲管理要求更高。

3. Reactor 实现详解

  • Reactor 核心 (Event Loop)
  • A***eptor / Liste
    ner
    :负责 a***ept 新连接
  • Connection / Handler:每个连接一个 handler,处理读写逻辑
  • 回调机制:注册回调函数 (on_read, on_write, on_close)

这种分层设计,契合许多高性能框架的实践。

3.1 核心数据结构设计

struct conn {
    int fd;                    // 文件描述符
    
    char rbuffer[BUFFER_SIZE]; // 读缓冲区
    int rlength;               // 读数据长度
    
    char wbuffer[BUFFER_SIZE]; // 写缓冲区  
    int wlength;               // 写数据长度
    
    CALLBACK send_callback;    // 发送回调函数指针
    union {
        CALLBACK a***ept_callback;  // 接受连接回调
        CALLBACK recv_callback;    // 接收数据回调
    } r_ation;                 // 读操作回调
};

亮点:

  • 使用数组 conn_list[fd],可 O(1) 访问连接对象
  • 回调采用函数指针 + union,简化结构
  • 读写缓冲分离,便于扩展业务逻辑

3.2 回调机制:Reactor 的灵魂

3.2.1 初始化服务器

文件:init_server()


int init_server(unsigned short port){

    int sockfd = socket(AF_I***, SOCK_STREAM, 0); // 监听套接字

    struct sockaddr_in servaddr;
    servaddr.sin_family = AF_I***;
    servaddr.sin_port = htons(2000);
    servaddr.sin_addr.s_addr = htonl(INADDR_ANY); // 0.0.0.0

    // 绑定失败会返回-1
    if(bind(sockfd, (struct sockaddr *)&servaddr, sizeof(servaddr)) == -1) {
        printf("bind failed : %s\n", strerror(errno));
        close(sockfd);
        return 1;
    }

    if(listen(sockfd, 5) == -1) {
        printf("listen failed : %s\n", strerror(errno));
        close(sockfd);
        return 1;
    }
    
    printf("listen finshed!\n");
    return sockfd;
}

亮点:

  • 多端口监听,为百万并发准备(避免单端口高 backlog 压力)
  • listen backlog 虽然是 5,但内核一般允许更大的队列(可调优)

set_event(fd, EPOLLIN, 1);

参数含义:

参数 含义
event 监控事件,如 EPOLLIN
flag=1 新增事件
flag=0 修改事件

3.2.3 a***ept 回调
// 接受连接回调
int a***ept_cb(int fd) {
    struct sockaddr_in clientaddr;
    socklen_t len = sizeof(clientaddr);
    int clientfd = a***ept(fd, (struct sockaddr *)&clientaddr, &len);
    
    if(clientfd < 0) {
        printf("a***ept errno : %d --> %s\n", errno, strerror(errno));
        return -1;
    }
    
    register_event(clientfd, EPOLLIN);  // 注册新连接的读事件
    return clientfd;
}

特点:

  • 非常适合测试百万连接的接入速度
  • 连接对象初始化在 register_event() 中完成

3.2.4 recv 回调
int recv_cb(int fd){
    memset(conn_list[fd].rbuffer, 0, BUFFER_SIZE);
    int count = recv(fd, conn_list[fd].rbuffer, conn_list[fd].rlength, 0);
    
    if(count <= 0){
        // 处理连接关闭或错误
        close(fd);
        epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
        return -1;
    }
    
    conn_list[fd].rlength = count;
    // 数据处理:简单回显
    conn_list[fd].wlength = conn_list[fd].rlength;
    memcpy(conn_list[fd].wbuffer, conn_list[fd].rbuffer, conn_list[fd].wlength);
    
    set_event(fd, EPOLLOUT, 0);  // 修改为监听写事件
    return count;
}


亮点:

  • 完整处理异常和客户端主动断开
  • 简单的 echo 逻辑
  • 事件从 EPOLLIN 修改为 EPOLLOUT(经典状态机)

3.2.5 send 回调
// 发送数据回调
int send_cb(int fd) {
    int count = send(fd, conn_list[fd].wbuffer, conn_list[fd].wlength, 0);
    set_event(fd, EPOLLIN, 0);  // 发送完成后重新监听读事件
    return count;
}

当我们监听到连接时即listenfd是EPOLLIN,我们执行a***ept_cb(),当clientfd的事件是EPOLLIN时,我们执行recv_cb(),将事件切回EPOLLOUT,关注写事件,当clientfd的事件是EPOLLOUT,我们将事件切回EPOLLIN,关注读事件。

3.3 事件循环:Reactor 的心脏

int main() {
    // 初始化 epoll 和多端口监听
    epfd = epoll_create(1);
    for(int i = 0; i < MAX_PORTS; i++){
        int sockfd = init_server(port + i);
        conn_list[sockfd].fd = sockfd;
        conn_list[sockfd].r_ation.a***ept_callback = a***ept_cb;
        set_event(sockfd, EPOLLIN, 1);
    }

    // 主事件循环
    while (1) {
        struct epoll_event events[1024] = {0};
        int nready = epoll_wait(epfd, events, 1024, -1);

        for(int i = 0; i < nready; i++){
            int connfd = events[i].data.fd;
            
            if(events[i].events & EPOLLIN){
                conn_list[connfd].r_ation.recv_callback(connfd);
            }
            if(events[i].events & EPOLLOUT){
                conn_list[connfd].send_callback(connfd);
            }
        }
    }
    return 0;
}

这是 Reactor 的核心:事件分发 + 对应回调处理

完整代码:

#include <errno.h>
#include <stdio.h>
#include <sys/socket.h>
#include <***i***/in.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>
#include <poll.h>
#include <sys/epoll.h>
#include <sys/select.h>
// 实现百万并发


#define BUFFER_SIZE 1024
#define CONNECTION_SIZE 1048576  // 1024 * 1024

#define MAX_PORTS 20
#define TIME_SUB_MS(tv1, tv2)  ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000)


typedef int (* CALLBACK) (int fd);  // 函数指针类型定义 

struct conn {
    int fd;

    char rbuffer[BUFFER_SIZE];
    int rlength;

    char wbuffer[BUFFER_SIZE];
    int wlength;

    CALLBACK send_callback;
    union 
    {
        CALLBACK a***ept_callback;
        CALLBACK recv_callback;
    } r_ation;
    
};

int epfd = 0; // epoll文件描述符  需要初始化

struct timeval begin = {0, 0}; // 记录时间


struct conn conn_list[CONNECTION_SIZE] = {0};

int init_server(unsigned short port);
int set_event(int fd, int event, int flag);
int register_event(int fd, int event);
int a***ept_cb(int fd);
int recv_cb(int fd);
int send_cb(int fd);

int init_server(unsigned short port){

    int sockfd = socket(AF_I***, SOCK_STREAM, 0); // 监听套接字

    struct sockaddr_in servaddr;
    servaddr.sin_family = AF_I***;
    servaddr.sin_port = htons(2000);
    servaddr.sin_addr.s_addr = htonl(INADDR_ANY); // 0.0.0.0

    // 绑定失败会返回-1
    if(bind(sockfd, (struct sockaddr *)&servaddr, sizeof(servaddr)) == -1) {
        printf("bind failed : %s\n", strerror(errno));
        close(sockfd);
        return 1;
    }

    if(listen(sockfd, 5) == -1) {
        printf("listen failed : %s\n", strerror(errno));
        close(sockfd);
        return 1;
    }
    
    printf("listen finshed!\n");
    return sockfd;
}

int set_event(int fd, int event, int flag) {

    if(flag) { // non-zero add
        struct epoll_event ev;
        ev.data.fd = fd;
        ev.events = event; 

        epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev);
    }

    else { // zero mod
        struct epoll_event ev;
        ev.data.fd = fd;
        ev.events = event; 

        epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev);
    }
}

int register_event(int fd, int event) {

    if(fd < 0) return -1;

    conn_list[fd].fd = fd;
    conn_list[fd].r_ation.recv_callback = recv_cb;
    conn_list[fd].send_callback = send_cb;

    memset(conn_list[fd].rbuffer, 0, BUFFER_SIZE);
    conn_list[fd].rlength = BUFFER_SIZE;

    memset(conn_list[fd].wbuffer, 0, BUFFER_SIZE);
    conn_list[fd].wlength = BUFFER_SIZE;

    set_event(fd, event, 1);
}

int a***ept_cb(int fd) {
    struct sockaddr_in clientaddr;
    socklen_t len = sizeof(clientaddr);

    int clientfd = a***ept(fd, (struct sockaddr *)&clientaddr, &len);
    // printf("a***ept finished  : %d\n", clientfd);

    if(clientfd < 0) {  
        printf("a***ept errno : %d --> %s\n", errno, strerror(errno));
        return -1;
    }
    
    register_event(clientfd, EPOLLIN);

    if((clientfd % 1000) == 0) {  // 每接受1000个连接打印一次

        struct timeval now;
        gettimeofday(&now, NULL);

		int time_used = TIME_SUB_MS(now, begin);
		memcpy(&begin, &now, sizeof(struct timeval));

        printf("a***ept finshed: %d, time_used: %d\n", clientfd, time_used);
    }
    return clientfd;
}

int recv_cb(int fd){
    
    memset(conn_list[fd].rbuffer, 0, BUFFER_SIZE);
    int count = recv(fd, conn_list[fd].rbuffer, conn_list[fd].rlength, 0);

    if(count < 0){
        printf("count: %d, errno: %d, %s\n", count, errno, strerror(errno));
		close(fd);

        epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
        return -1;
    }

    if(count == 0){
		printf("client disconnect: %d\n", fd);
		close(fd);
        
        epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
        return -1;
    }

    conn_list[fd].rlength = count;
    // printf("recv: %s\n", conn_list[fd].rbuffer);
    
    // ------------
    conn_list[fd].wlength = conn_list[fd].rlength;
    memcpy(conn_list[fd].wbuffer, conn_list[fd].rbuffer, conn_list[fd].wlength);
    // ------------

    set_event(fd, EPOLLOUT, 0);
    return count;
}

int send_cb(int fd) {
    int count = send(fd, conn_list[fd].wbuffer, conn_list[fd].wlength, 0);

    // printf("SEND: %s\n", conn_list[fd].wbuffer);
    set_event(fd, EPOLLIN, 0);
    return count;
}


int main() {

    unsigned short port = 2000;

    epfd = epoll_create(1); // 创建epoll实例

    for(int i = 0; i < MAX_PORTS; i++){
        int sockfd = init_server(port + i);

        conn_list[sockfd].fd = sockfd;
        conn_list[sockfd].r_ation.a***ept_callback = a***ept_cb;

        set_event(sockfd, EPOLLIN, 1);
    }

    gettimeofday(&begin, NULL);  // 记录起始时间

    while (1) {
        struct epoll_event events[1024] = {0};
		int nready = epoll_wait(epfd, events, 1024, -1);

        for(int i = 0; i < nready; i++){
            int connfd = events[i].data.fd;

            if(events[i].events & EPOLLIN){
                conn_list[connfd].r_ation.recv_callback(connfd);
            }
            if(events[i].events & EPOLLOUT){
                conn_list[connfd].send_callback(connfd);
            }
        }
    }
    
    return 0;
}



转载请说明出处内容投诉
CSS教程网 » 基于 epoll 的 Reactor 模式实现:从原理到百万并发实践

发表评论

欢迎 访客 发表评论

一个令你着迷的主题!

查看演示 官网购买