Reactor 和 Proactor有什么区别?
Reactor 和 Proactor 是两种主流的异步 I/O 设计模式, 核心目标都是高效处理并发 I/O 操作,但它们的事件触发时机和职责划分有本质区别。
- Reactor:“被动触发” 模式, 内核通知 “可以做 I/O 了”,应用程序自己做 I/O(读 / 写)。
- Proactor: “主动完成” 模式, 应用程序告诉内核 “要做什么 I/O”,内核做完后通知 “已完成”,应用程序直接用结果。
实际开发中,Reactor 因兼容性好(跨平台)、实现简单而更常用(如 Nginx、Redis、libevent 均基于 Reactor);Proactor 多用于对性能要求极高且依赖特定系统(如 Windows 服务器)的场景。
单线程Reactor模型
单线程 Reactor 模型的工作流程
Reactor 是一种 “事件驱动” 的设计模式,单线程版本的执行逻辑分为 4 步:
-
步骤 1:Reactor 监控与分发事件Reactor 对象通过
select监控所有客户端的请求事件(如 “建立连接”“数据可读”);当select检测到事件后,Reactor 通过Dispatch(分发)机制,将事件交给对应的处理逻辑。 -
步骤 2:处理 “建立连接” 事件如果是建立连接的请求事件,由
A***eptor(负责 “接受连接” 的组件)调用A***ept处理连接请求;连接建立后,创建一个Handler对象,后续该连接的所有业务都由这个Handler处理。 -
步骤 3:处理 “非连接” 事件如果是非连接事件(如已连接的客户端发送了数据,触发 “数据可读”),Reactor 会分发调用该连接对应的
Handler,由Handler响应事件(如读数据、处理业务)。 -
步骤 4:Handler 完成业务闭环
Handler会执行 “Read(读数据)→ 业务处理(如计算、逻辑操作)→Send(发响应)” 的完整流程,负责一个连接的全生命周期业务。
3. 优点:模型简单
所有操作都在一个线程内完成,因此没有 “多线程竞争”“进程间通信” 等复杂问题,代码逻辑容易理解和实现。
4. 缺点:性能与阻塞问题突出
- 多核 CPU 性能浪费:单线程串行执行,无法利用多核 CPU 的并行能力,性能上限低。
-
业务阻塞导致全局阻塞:如果某个
Handler处理业务时阻塞(比如业务逻辑耗时久,或Read/Send操作本身阻塞),整个进程会卡住,其他连接的Handler都无法执行。 -
A***eptor 也被阻塞:更严重的是,
Handler的阻塞会导致 “新连接请求” 也无法被接受 —— 因为A***eptor和所有Handler都在同一个线程,select也会因线程阻塞而无法监控新事件。
这些缺陷导致单线程 Reactor 模型实际使用很少,通常作为 “Reactor 模式” 的基础概念,后续会演化出 “多线程 Reactor”“主从 Reactor” 等更高效的版本。
单Reactor多线程
一、核心组件与角色
- Client:客户端,发起网络请求(如建立连接、发送数据)。
-
Reactor 主线程:
- 核心能力:通过
select(IO 复用 API)监控多路客户端事件(连接请求、数据读写等),并通过dispatch(分发)将事件交给对应组件处理。 - 子组件:
-
A***eptor:专门处理 **“建立连接” 事件 **,调用a***ept完成 TCP 三次握手。 -
Handler:响应 **“非连接事件”(如数据可读),负责read(读数据)和send(发响应),但不做业务逻辑处理 **(业务逻辑交给 Worker 线程池)。
-
- 核心能力:通过
- Worker 线程池:由多个 Worker 线程组成,负责真正的业务逻辑处理(如数据计算、数据库操作等),与 Reactor 主线程解耦,避免主线程因业务阻塞。
二、工作流程(结合 “说明” 文字与箭头)
-
步骤 1:Reactor 监控并分发事件Reactor 通过
select持续监控所有客户端的事件(连接、数据读写等);当select检测到事件后,Reactor 用dispatch机制将事件分发给对应处理逻辑。 -
步骤 2:处理 “建立连接” 事件若事件是 **“建立连接请求”**,则由
A***eptor调用a***ept处理连接;连接建立后,创建对应的Handler对象,后续该连接的 IO 事件由这个Handler响应。 -
步骤 3:处理 “非连接” 事件若事件是 **“非连接事件”**(如已连接客户端发送数据,触发 “数据可读”),Reactor 会分发到该连接对应的
Handler进行响应。 -
步骤 4:Handler 转发任务到 Worker 线程池
Handler仅负责IO 层响应:通过read读取客户端数据后,不直接处理业务,而是将 “数据 + 业务任务” 分发给Worker 线程池。 -
步骤 5:Worker 线程处理业务逻辑
Worker 线程池分配独立线程,对收到的任务执行业务逻辑计算(如解码、业务逻辑处理、编码等);处理完成后,将结果返回给对应的Handler。 -
步骤 6:Handler 将结果返回给 Client
Handler收到 Worker 线程的处理结果后,通过send操作将响应数据返回给客户端(Client)。
三、设计意图与优势
-
解决单线程 Reactor 的缺陷:单线程 Reactor 中,
Handler处理业务会阻塞主线程,导致 “其他连接无法响应、新连接无法建立”。多线程版本将 **“业务逻辑” 剥离到 Worker 线程池 **,让 Reactor 主线程仅负责 “事件监控、IO 操作、分发”,避免阻塞。 -
性能与分工优化:
- Reactor 主线程专注 IO 和事件分发,效率高,可同时处理大量连接的事件。
- 业务逻辑由线程池并行执行,充分利用多核 CPU,提升整体吞吐量。
- 连接建立、IO 响应、业务处理职责解耦,代码模块化更清晰。
这种模型是 “单线程 Reactor” 的进化版,在高性能网络编程(如 Redis、***ty 早期设计)中被广泛借鉴,平衡了 “事件驱动的高效 IO” 与 “并行业务处理” 的需求。
主从Reactor多线程模型
一、背景:单 Reactor 多线程的瓶颈
在 “单 Reactor 多线程模型” 中,Reactor 主线程既要处理 “建立连接” 事件,又要分发所有 IO 事件。高并发场景下,Reactor 主线程会因 “事件监控 + 分发” 的压力成为性能瓶颈。
二、主从 Reactor 多线程的工作流程(方案说明)
模型将 Reactor 拆分为 MainReactor(主 Reactor) 和 SubReactor(从 Reactor),分工处理不同环节:
-
MainReactor 监控 “建立连接” 事件MainReactor(Reactor 主线程)通过
select只监控 **“建立连接” 的事件 **(如客户端 TCP 三次握手请求)。当检测到这类事件时,由A***eptor处理连接建立(执行a***ept操作)。 -
MainReactor 分配连接给 SubReactor连接建立后,MainReactor 会把新连接分配给某一个 SubReactor(Reactor 子线程)。
-
SubReactor 监控连接的 IO 事件SubReactor 将新连接加入自己的 “监控队列”,后续负责监听该连接上的所有 IO 事件(如数据可读、可写),并为该连接创建
Handler(处理事件的组件)。 -
SubReactor 分发事件给 Handler当连接上有 IO 事件(如客户端发数据),SubReactor 调用该连接对应的
Handler响应事件。 -
Handler 转发业务给 Worker 线程池
Handler通过Read读取数据后,不直接处理业务,而是将 “数据 + 业务任务” 分发给Worker 线程池。 -
Worker 线程池处理业务并返回结果
Worker 线程池分配独立线程执行业务逻辑(如计算、数据库操作),处理完成后将结果返回给Handler。 -
Handler 返回响应给 Client
Handler收到结果后,通过Send将响应发回客户端。
三、优点
-
职责明确,数据交互简单:
- MainReactor 仅负责 “接收新连接”,然后将连接交给 SubReactor;
- SubReactor 负责 “管理连接的 IO 事件”,再将业务交给 Worker 线程池;
- Worker 线程池仅负责 “业务逻辑”。各组件分工单一,数据交互(连接传递、任务分发、结果返回)逻辑清晰。
-
广泛应用于知名项目:该模型在工业界被广泛采用,例如:
- Nginx(主从 Reactor 多进程模型,用 “进程” 替代 “线程”,核心思路一致);
- Memcached(主从多线程模型);
- ***ty(支持主从多线程模型)。
核心设计意图
通过拆分 Reactor 为 “主 - 从” 结构,让 “新连接接入” 和 “已有连接的 IO 事件处理” 由不同 Reactor 线程负责,进一步提升高并发性能;同时保留 “Worker 线程池处理业务” 的设计,解耦 IO 与业务逻辑。
https://gitee.***/prawn-playfully/thread-pool/tree/master/Reactor 可通过这个网站查看代码
代码设计类图V1
类的说明
1. I***Address 类
- 职责:封装 ** 网络地址(IP + 端口)** 的操作(TCP 通信必须依赖网络地址)。
-
核心成员与方法:
-
_addr: struct sockaddr_in:存储 IPv4 地址结构体(包含 IP、端口等信息)。 - 构造函数:支持 “端口 + IP” 或 “直接传入
sockaddr_in” 创建地址对象。 -
ip()/port():获取 IP 字符串、端口号;getI***AddressPtr():返回sockaddr_in*指针,供底层 socket 调用(如bind、connect等)。
-
2. Socket 类
- 职责:管理套接字文件描述符(fd),利用 RAII 特性(资源获取即初始化)自动管理套接字生命周期(构造时创建、析构时关闭,避免资源泄漏)。
-
核心成员与方法:
-
_fd: int:套接字的文件描述符。 - 构造函数:支持 “默认创建 socket” 或 “用已有 fd 包装”。
-
fd():获取文件描述符;shutdownWrite():主动断开写连接(内部调用shutdown(_fd, SHUT_WR),表示后续不再发送数据,但仍可接收)。
-
3. SocketIO 类
- 职责:封装套接字的 IO 操作(底层数据收发的具体实现)。
-
核心成员与方法:
-
_fd: int:关联要操作的套接字 fd。 -
readn:接收固定字节数的数据;readline:按行接收数据(读取一行);sendn:发送固定字节数的数据 —— 这些方法封装了read/write等系统调用,处理了 “部分读写” 等细节。
-
4. A***eptor 类
- 职责:作为新连接的 “接收器”,负责服务器端 “监听端口、接受 TCP 连接” 的流程。
-
核心成员与方法:
-
_addr: I***Address:服务器要绑定的地址;_sock: Socket:用于 “监听” 的套接字。 - 构造函数:指定端口 / IP,初始化监听配置。
-
ready():整合 “绑定(bind)、监听(listen)” 等操作,让服务器进入 “可接受连接” 状态;setReuseAddr/setReusePort:设置 “地址 / 端口复用”(避免端口占用问题);a***ept():接受客户端连接,返回新连接的 fd。
-
5. TcpConnection 类
- 职责:代表一个已建立的 TCP 连接,提供 “接收 / 发送数据、管理连接生命周期” 的能力(支持多种收发方式,因此有函数重载)。
-
核心成员与方法:
-
_sock: Socket:管理连接的 fd;_socketIO: SocketIO:实际执行数据读写;_localAddr/_peerAddr:本地(服务器)和对端(客户端)的地址;_isShutdownWrite:标记是否已关闭写端。 -
receive():接收数据;send():发送数据(重载版本支持不同参数);shutdown():断开连接;toString():获取连接的 “元信息”(如两端 IP / 端口),用于调试。
-
-
依赖与协作:
A***eptor依赖I***Address(获取地址)和Socket(创建监听套接字)完成 “接受连接”;TcpConnection依赖Socket(fd 管理)、SocketIO(IO 操作)、I***Address(地址信息)完成 “连接收发数据”。
执行步骤
这套 TCP 网络编程类结构的执行流程,可分为服务器初始化、接受连接、数据收发、连接关闭四个核心阶段,各阶段由不同类协作完成:
一、服务器初始化阶段(I***Address + Socket + A***eptor)
-
I***Address封装网络地址:创建I***Address对象,指定服务器要监听的端口和IP(如I***Address(8080, "0.0.0.0"))。它内部将端口、IP 转换为struct sockaddr_in结构体,并提供ip()、port()等方法获取地址信息,getI***AddressPtr()则返回地址结构体指针,供底层 socket 调用。 -
Socket与A***eptor初始化监听:-
A***eptor构造时,传入I***Address(确定监听地址),并创建Socket对象(管理监听用的套接字 fd)。 -
A***eptor调用ready()方法,依次执行:-
setReuseAddr(true)/setReusePort(true):设置地址 / 端口复用(避免端口占用、快速重启服务器)。 -
bind():将I***Address封装的地址,绑定到Socket的文件描述符(_fd)。 -
listen():将Socket设为监听状态,准备接受客户端连接。
-
-
二、接受客户端连接阶段(A***eptor → TcpConnection)
-
A***eptor接受客户端连接:当客户端发起connect请求时,A***eptor的a***ept()方法被触发(通常由事件循环如epoll检测到 “监听套接字有连接请求”)。a***ept()调用系统a***ept接口,获取客户端的套接字 fd(clientFd)。 -
创建
TcpConnection管理连接:用clientFd实例化TcpConnection对象。TcpConnection内部:- 保存
Socket(封装clientFd,管理连接的生命周期)。 - 初始化
SocketIO(后续数据读写的工具类)。 - 获取并保存本地地址(服务器的
I***Address)和对端地址(客户端的I***Address)。
- 保存
三、数据收发阶段(TcpConnection + SocketIO)
-
接收客户端数据:调用
TcpConnection::receive(),内部通过SocketIO读取数据:-
SocketIO::readn(buf, len):读取固定长度(len字节)的数据到buf。 -
SocketIO::readline(buf, max):读取一行数据(遇到换行符为止)到buf,最多读max字节(避免缓冲区溢出)。
-
-
发送数据到客户端:调用
TcpConnection::send(const string& msg),内部通过SocketIO::sendn(buf, len)发送数据:SocketIO::sendn(buf, len)会将buf中前len字节的数据写入套接字,并处理 “部分写” 问题(确保数据完整发送)。
四、连接关闭阶段(TcpConnection + Socket)
-
主动断开连接:调用
TcpConnection::shutdown(),内部触发Socket::shutdownWrite()。Socket::shutdownWrite()调用系统shutdown(_fd, SHUT_WR),主动关闭连接的 “写端”(表示服务器不再向客户端发数据,但仍可接收)。 -
资源自动释放(RAII 机制):当
TcpConnection对象析构时,其内部的Socket也会析构。Socket的析构函数会调用close(_fd),利用 RAII(资源获取即初始化) 特性,自动关闭套接字 fd,避免资源泄漏。
辅助:调试与信息获取
-
TcpConnection::toString():用于调试,获取连接的 “五元组信息”(源 IP、源端口、目的 IP、目的端口、协议),方便排查网络问题。
这套结构通过分层封装,将 TCP 通信的底层细节(地址、套接字、IO 操作)隐藏在类中,上层只需通过简洁的接口(如 TcpConnection::send/receive)即可完成复杂通信,同时利用 RAII 保证资源安全,函数重载支持多种收发场景,提升了代码的可维护性。
类图V2
核心类的职责与协作
1. 基础工具类
-
I***Address:封装网络地址(struct sockaddr_in),提供 IP、端口的访问方法,让上层无需直接操作底层地址结构体。 -
Noncopyable:“不可拷贝” 基类(通过私有拷贝构造 / 赋值),让Socket等类继承后禁止拷贝,避免套接字 fd 等资源因拷贝导致的管理混乱(如重复关闭同一 fd)。 -
Socket:继承Noncopyable,利用 RAII(资源获取即初始化) 管理套接字 fd(构造时创建、析构时关闭),提供shutdownWrite(主动关闭写端)等操作。 -
SocketIO:封装套接字的 IO 细节(readn读固定字节、readline按行读、writen写固定字节),简化上层数据收发。
2. 服务器连接管理类
-
A***eptor:服务器 “新连接接收器”,依赖I***Address和Socket,完成 ** 绑定(bind)、监听(listen)、接受(a***ept)** 新连接的流程,生成客户端套接字 fd 后交给TcpConnection管理。 -
TcpConnection:代表一条已建立的 TCP 连接,包含连接的套接字(Socket)、IO 工具(SocketIO)、地址信息(localAddr/peerAddr),并通过 回调函数(onConnection连接建立、onMessage收到消息、onClose连接关闭)向业务层通知事件。使用TcpConnectionPtr(shared_ptr<TcpConnection>)管理生命周期,避免野指针。
3. 事件循环核心类
-
EventLoop:基于epoll的事件循环核心,管理所有需要监控的文件描述符(A***eptor的监听 fd、各TcpConnection的 fd)。当事件触发时(如新连接、数据可读),调用对应处理函数(handleNewConnection处理新连接、handleMessage处理已有连接消息),并触发TcpConnection的回调,完成业务逻辑分发。内部用_conns(map)存储活跃连接,方便管理。
二、关键设计特性
-
RAII 保障资源安全:
Socket构造时获取 fd,析构时关闭 fd,确保套接字资源不泄漏。 -
禁止拷贝避免混乱:
Noncopyable让Socket等类无法被拷贝,防止多个对象竞争同一 fd 导致的错误(如重复关闭)。 -
智能指针管理连接生命周期:
TcpConnectionPtr(shared_ptr)确保连接被多个组件(EventLoop、业务回调)引用时,能自动释放资源,避免野指针。 -
回调机制解耦业务与网络:
TcpConnection和EventLoop通过回调函数(如onConnection),让 “网络事件发生” 与 “业务逻辑处理” 解耦 —— 业务层只需注册回调,无需修改网络层代码。 -
IO 多路复用提升效率:
EventLoop基于epoll实现 “单线程监控多 fd 事件”,在高并发场景下仍能高效分发事件。
三、流程示例(新连接建立)
-
A***eptor初始化:绑定端口、启动监听,EventLoop将A***eptor的 fd 加入epoll监控。 - 客户端发起 TCP 连接,
epoll检测到A***eptor的 fd 有 “新连接请求” 事件。 -
EventLoop调用handleNewConnection,A***eptor执行a***ept得到客户端 fd。 - 创建
TcpConnection对象(用TcpConnectionPtr管理),并将客户端 fd 加入epoll监控。 - 触发
TcpConnection的onConnection回调,业务层执行 “连接建立后逻辑”(如记录连接、发送欢迎消息)。
总结
该框架通过面向对象封装(隐藏底层 socket 细节)、回调机制(解耦业务与网络)、C++ 资源管理特性(RAII、智能指针),实现了 “高效事件驱动 + 安全资源管理”,适合构建高性能 TCP 服务(如服务器、网关等)。
类图V3
TcpServer:服务器的「门面类」,封装 A***eptor 和 EventLoop,对外提供简洁接口(start 启动服务、stop 停止、setAllCallbacks 设置业务回调)。用户无需关心底层 A***eptor/EventLoop 细节,只需通过 TcpServer 即可快速搭建 TCP 服务。
类图V4
新增方法
1. TcpServer:服务器的 “门面封装”
它是对外的统一接口类,内部组合了 A***eptor(负责接受连接)和 EventLoop(负责事件循环)。开发者无需关注 A***eptor 如何监听、EventLoop 如何处理事件,只需通过 TcpServer 的 start()、setAllCallbacks() 等方法,就能快速启动 TCP 服务并绑定业务回调,极大简化了服务器的使用复杂度。
2. EventLoop 的 “多线程安全与异步任务” 能力
-
MutexLock(互斥锁):用于保护_pendingFunctors(待执行的回调队列)。因为runInLoop可能被其他线程(如线程池)调用,需要通过锁保证 “回调入队” 操作的原子性,避免多线程竞争。 -
PendingFunctors(待执行回调队列):EventLoop不仅处理网络 IO 事件,还支持异步执行任务。其他线程可通过runInLoop将回调放入该队列,由EventLoop所在的 IO 线程在合适时机(如epoll_wait间隙)执行,实现 “跨线程任务调度”。 -
runInLoop + wakeup配合:若当前线程不是EventLoop的 IO 线程,runInLoop会将回调入队后,通过wakeup(通常用eventfd或 pipe 实现)主动唤醒epoll_wait,让 IO 线程及时处理新加入的回调,解决 “其他线程如何通知 IO 线程执行任务” 的问题。
3. TcpConnection 的 “细化方法与线程安全逻辑”
新增 sendInLoop、handleReadCallback 等方法,强调:
-
线程内安全发送数据:
sendInLoop确保 “数据发送操作” 在EventLoop所在的 IO 线程执行(避免多线程直接操作套接字导致竞争)。 -
回调的具体执行:
handleReadCallback等方法是对onMessage等业务回调的 “封装执行”,保证回调在正确的线程上下文(IO 线程)中触发。
这些新增部分,让框架支持多线程协作(如线程池处理业务,IO 线程处理网络事件),同时通过封装和锁机制,保证了多线程场景下网络操作的安全性。
执行步骤
以下是该框架从服务器启动到处理客户端请求并响应的完整执行步骤,重点突出多线程协作、异步任务调度等核心流程(结合之前未详细展开的细节):
阶段 1:服务器初始化与启动
1. TcpServer 初始化(主线程执行)
- 开发者创建
TcpServer对象(如TcpServer server(8080)),TcpServer内部完成:- 初始化
A***eptor:绑定端口(8080)、设置监听套接字(Socket),并配置地址复用(SO_REUSEADDR)等参数。 - 初始化
EventLoop:创建epoll实例,将A***eptor的监听套接字 fd 加入epoll监控(关注EPOLLIN事件,即 “新连接请求”)。 - 注册业务回调:通过
server.setAllCallback(onConnection, onMessage, onClose),将用户定义的 “连接建立 / 消息到达 / 连接关闭” 回调函数保存到TcpServer中,后续传递给TcpConnection。
- 初始化
2. 启动服务器(主线程执行)
- 调用
server.start(),TcpServer触发EventLoop::loop(),启动事件循环:-
EventLoop进入while (_isLooping)循环,通过epoll_wait阻塞监听所有被监控的 fd(此时只有A***eptor的监听 fd)。
-
阶段 2:客户端连接建立(IO 线程执行)
3. 监听新连接事件(EventLoop 线程,即 IO 线程)
- 客户端通过
connect发起 TCP 连接,服务器监听套接字触发EPOLLIN事件,epoll_wait返回就绪事件。 -
EventLoop遍历就绪事件,发现是A***eptor的 fd,调用handleNewConnection()。
4. 接受连接并创建 TcpConnection(IO 线程执行)
-
A***eptor::a***ept()调用系统调用a***ept,获取客户端套接字 fd(peerfd)。 - 创建
TcpConnection对象(通过TcpConnectionPtr conn(new TcpConnection(peerfd, this)),this指向当前EventLoop):-
TcpConnection初始化:保存客户端 fd、本地地址(服务器)、对端地址(客户端),并关联SocketIO(用于数据收发)。 - 绑定回调:将
TcpServer传递的onConnection/onMessage/onClose回调保存到TcpConnection的_onConnection/_onMessage/_onClose成员。
-
-
EventLoop将客户端 fd 加入epoll监控(关注EPOLLIN事件,即 “数据可读”),并将conn存入_conns映射表(fd -> TcpConnectionPtr)。
5. 触发 “连接建立” 回调(IO 线程执行)
-
TcpConnection::handleNewConnectionCallback()被调用,执行_onConnection(conn)(用户注册的业务回调,如打印 “客户端已连接”)。
阶段 3:客户端发送数据与业务处理(多线程协作)
6. 检测数据可读事件(IO 线程执行)
- 客户端通过
send发送数据,客户端 fd 触发EPOLLIN事件,epoll_wait返回就绪事件。 -
EventLoop遍历就绪事件,找到对应客户端 fd 的TcpConnection(通过_conns映射表),调用handleMessage(fd)。
7. 读取数据并触发 “消息到达” 回调(IO 线程执行)
-
TcpConnection::handleMesssageCallback()被调用,内部通过SocketIO::readline读取客户端数据(如 “hello server”)。 - 执行
_onMessage(conn)(用户注册的回调,如onMessage函数):- 在
onMessage中,将读取到的数据封装为任务(如Mytask),通过gThreadpool.addtask(...)提交到线程池。
- 在
8. 线程池处理业务逻辑(线程池线程执行)
- 线程池的工作线程从任务队列中取出
Mytask,执行Mytask::process():- 处理业务(如拼接响应字符串 “xia hello server”)。
- 调用
_conn->sendInLoop(response),希望将响应发送给客户端(关键:此时在非 IO 线程,不能直接操作套接字,需通过EventLoop异步执行)。
9. 异步发送响应(跨线程协作)
-
TcpConnection::sendInLoop(response)调用_loop->runInLoop(...),将 “发送数据” 的 lambda 函数([this, msg]() { this->send(msg); })提交到EventLoop:-
runInLoop逻辑(线程池线程执行):- 加锁(
MutexLock),将 lambda 加入EventLoop的_pendingFunctors队列(待执行任务队列)。 - 调用
EventLoop::wakeup(),通过eventfd唤醒阻塞的epoll_wait:-
wakeup()向eventfd写入 8 字节数据(uint64_t one = 1),触发eventfd的EPOLLIN事件。
-
- 加锁(
-
阶段 4:IO 线程执行异步任务(IO 线程执行)
10. 唤醒 EventLoop 并处理异步任务
-
eventfd的EPOLLIN事件被epoll_wait检测到,EventLoop调用handleRead()读取eventfd数据(清空计数器,避免重复唤醒)。 - 调用
doPendingFunctors()执行队列中的任务:- 加锁,将
_pendingFunctors与局部functors交换(减少锁持有时间,避免阻塞其他线程提交任务)。 - 遍历
functors,执行 lambda 函数:this->send(msg)(TcpConnection::send)。
- 加锁,将
11. 发送响应数据(IO 线程执行)
-
TcpConnection::send通过SocketIO::writen将响应数据(“xia hello server”)写入客户端 fd,完成数据发送。
阶段 5:客户端断开连接(IO 线程执行)
12. 检测连接关闭事件
- 客户端调用
close断开连接,服务器端read会返回 0,TcpConnection::isClosed()检测到连接关闭。 -
EventLoop::handleClose(fd)被调用:- 将客户端 fd 从
epoll监控中移除(delEpollReadFd)。 - 从
_conns映射表中删除TcpConnection(shared_ptr引用计数减为 0,自动析构)。 - 触发
TcpConnection::handleCloseCallback(),执行_onClose(conn)(用户注册的回调,如打印 “客户端已断开”)。
- 将客户端 fd 从
核心设计亮点的执行体现
-
线程安全的异步任务:
runInLoop通过互斥锁保护_pendingFunctors,确保多线程提交任务安全;wakeup机制保证 IO 线程及时处理任务。 -
IO 与业务解耦:IO 操作(读写)在 IO 线程执行,业务逻辑(
Mytask::process)在线程池执行,避免业务阻塞 IO。 -
资源自动管理:
Socket析构关闭 fd,TcpConnection用shared_ptr自动释放,避免资源泄漏。
整个流程通过 “事件驱动” 和 “多线程协作”,既保证了 IO 效率,又支持业务并行处理,是高性能网络框架的典型实现
V5
V5是全封装
工作中的reactor
在工作中,几乎不会自己手写 Reactor 模式的完整实现,而是优先使用成熟的开源库。原因如下:
1. Reactor 模式的实现复杂度远超想象
Reactor 的核心是I/O 多路复用(如 epoll、kqueue、select)+ 事件分发 + 状态管理,但实际落地需要处理大量细节:
- 跨平台适配:不同操作系统的 I/O 多路复用接口差异(epoll 仅 Linux,kqueue 仅 BSD/Mac,IOCP 是 Windows 的异步模型),自己实现需兼容多套接口。
- 边缘情况处理:如文件描述符(FD)的动态增减、事件就绪后的错误处理(EAGAIN、EINTR)、信号与 I/O 事件的协同、惊群效应避免等。
- 性能优化:事件循环的效率(如减少系统调用、避免 FD 抖动)、内存池管理(减少动态分配)、多线程 Reactor 的负载均衡等。
- 扩展性:支持 TCP/UDP、定时任务、信号处理、SSL/TLS 等附加功能,手写成本极高。
这些细节稍有疏漏就可能导致 bug(如内存泄漏、死锁、性能瓶颈),甚至线上故障。
2. 成熟开源库已覆盖绝大多数场景
工业界有大量经过验证的 Reactor 库,稳定性和性能都远胜手写实现,常见的有:
-
C/C++:
-
libevent:最经典的跨平台 Reactor 库,支持 epoll/kqueue/select/IOCP,轻量且稳定,被 Nginx、Redis 等依赖。 -
libuv:Node.js 的底层事件库,基于 Reactor 模式,支持多平台,功能丰富(I/O、定时器、进程管理等)。 -
muduo:陈硕老师开源的 C++11 网络库,单线程 Reactor + 多线程 Worker 模式,代码风格清晰,适合学习和中小型项目。 -
Boost.Asio:Boost 库的一部分,封装了 Reactor(同步 I/O 多路复用)和 Proactor(异步 I/O),接口抽象度高,跨平台性好。
-
libevent库
#include <event2/event.h>
#include <event2/listener.h>
#include <arpa/i***.h>
// 客户端连接回调:新客户端连接时触发
void a***ept_conn(struct evconnlistener *listener, evutil_socket_t fd,
struct sockaddr *addr, int len, void *ptr) {
struct event_base *base = (struct event_base *)ptr;
// 创建 bufferevent 处理客户端通信
struct bufferevent *bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE);
if (!bev) return;
// 设置回调
bufferevent_setcb(bev, bev_read_callback, NULL, bev_event_callback, NULL);
bufferevent_enable(bev, EV_READ); // 启用读事件
}
int main() {
struct event_base *base = event_base_new();
struct sockaddr_in sin;
sin.sin_family = AF_I***;
sin.sin_port = htons(8080); // 监听 8080 端口
sin.sin_addr.s_addr = INADDR_ANY;
// 创建监听器:接受客户端连接
struct evconnlistener *listener = evconnlistener_new_bind(
base, a***ept_conn, base,
LEV_OPT_REUSEABLE | LEV_OPT_CLOSE_ON_FREE, // 选项:端口复用、销毁时关闭
-1, (struct sockaddr*)&sin, sizeof(sin)
);
event_base_dispatch(base); // 启动事件循环
// 清理资源
evconnlistener_free(listener);
event_base_free(base);
return 0;
}