前言
在高并发成为系统标配的今天,网络编程、中间件开发、分布式通信等场景中,“如何高效处理海量 IO 请求” 始终是开发者绕不开的核心命题。传统 “一连接一线程” 的同步阻塞模型,早已因线程资源耗尽、CPU 上下文切换频繁、内存占用过高等问题,难以应对万级甚至十万级的并发连接;即便引入线程池优化,也无法从根本上解决 “等待 IO 时线程闲置” 的资源浪费困境。
正是在这样的需求下,基于 “事件驱动” 与 “IO 多路转接” 的 Reactor 模式应运而生 —— 它以 “少量线程监听多 IO、事件触发业务处理” 的核心逻辑,成为解决高并发 IO 的经典架构:小到 ***ty 的网络通信内核、Redis 的事件循环,大到 Nginx 的请求处理框架、Kafka 的消息接收模块,其底层都能看到 Reactor 模式的影子。可以说,理解 Reactor 模式的实现逻辑,是掌握高并发系统设计的 “关键钥匙”。
本文正是围绕 “Reactor 模式实现” 展开:不局限于抽象原理,而是从底层技术依赖(IO 多路转接调用)切入,一步步拆解事件循环的构建、组件间的协作逻辑,手把手帮助你构建出一个基于Reactor 模式的服务器。
一. Epoll的工作模式
Epoll有两种工作模式:水平触发(Level Triggered,简称 LT) 和 边缘触发(Edge Triggered,简称 ET)。这两种模式的核心差异在于 “何时通知应用程序某个文件描述符(fd)就绪”,直接影响高并发 IO 处理的效率和编程复杂度。
- 水平触发(LT):默认模式,“状态持续” 触发:
当一个文件描述符(如 socket)处于就绪状态(例如:有数据可读、可写,或发生异常)时,epoll 会持续通知应用程序,直到该就绪状态被 “消除”(例如:数据被完全读取、缓冲区被写满)。
- 边缘触发(ET):“状态变化” 触发,高效但复杂:
epoll 仅在文件描述符的就绪状态发生 “变化瞬间” 通知一次,之后无论该状态是否持续,都不再通知。,即只有在读写资源从没就绪多就绪的时候才会进行通知。
我们通常会认为ET模式的效率更高:
- ET当资源就绪的时候只会通知一次,并不需要反复通知。并且如果上层没有将数据读取完毕,也不会再进行通知了;
- 因为ET模式只会进行通知一次,因此其==会倒逼着上层在进行读取时要将数据一次全部取完,这样就可以空出一个更大的接收缓冲区,对方也可以发送更多的。
二. Reactor 服务器
一下我们开始进行基于 Reactor 模式设计的高性能网络服务器,通过 “事件驱动” 和 “IO 多路转接” 技术,高效处理海量并发连接。
2.1 对网络套接字进行封装
关于网络套接字可以查看,我之前写的关于TCP的文章,改内容并不是本文的重点,所以此处直接贴实现代码了:
const std::string defaultip_ = "0.0.0.0";
enum SockErr
{
SOCKET_Err,
BIND_Err,
};
class Sock
{
public:
Sock(uint16_t port)
: port_(port),
listensockfd_(-1)
{
}
void Socket()
{
listensockfd_ = socket(AF_I***, SOCK_STREAM, 0);
if (listensockfd_ < 0)
{
Log(Fatal) << "socket fail";
exit(SOCKET_Err);
}
Log(Info) << "socket sucess";
}
void Bind()
{
struct sockaddr_in server;
server.sin_family = AF_I***;
server.sin_port = htons(port_);
i***_pton(AF_I***, defaultip_.c_str(), &server.sin_addr);
if (bind(listensockfd_, (struct sockaddr *)&server, sizeof(server)) < 0)
{
Log(Fatal) << "bind fail";
exit(BIND_Err);
}
Log(Info) << "bind sucess";
}
void Listen()
{
if (listen(listensockfd_, 10) < 0)
{
Log(Warning) << "listen fail";
}
Log(Info) << "listen sucess";
}
int A***ept()
{
struct sockaddr_in client;
socklen_t len = sizeof(client);
int fd = a***ept(listensockfd_ , (sockaddr*)&client , &len);
return fd;
}
int A***ept(std::string& ip , uint16_t& port)
{
struct sockaddr_in client;
socklen_t len = sizeof(client);
int fd = a***ept(listensockfd_ , (sockaddr*)&client , &len);
port = ntohs(client.sin_port);
char bufferip[64];
i***_ntop(AF_I*** , &client.sin_addr , bufferip , sizeof(bufferip) - 1);
ip = bufferip;
return fd;
}
int Get_fd()
{
return listensockfd_;
}
~Sock()
{
close(listensockfd_);
}
private:
uint16_t port_;
int listensockfd_;
};
2.2 对Epoll接口进行封装
关于Epoll具体的细节,可以查看之前关于关于Epoll的文章,此处我们直接对封装的接口进行使用:
enum EpollErr
{
CREAR_Err,
};
class Epoll
{
public:
Epoll()
{
// 创建epoll模型
_epfd = epoll_create(1);
if (_epfd < 0)
{
Log(Fatal) << "epoll_create fail";
exit(CREAR_Err);
}
Log(Info) << "epoll create sucess ";
}
void Add_fd(int fd, uint32_t event)
{
// 添加文件描述符到红黑树中
struct epoll_event epevt;
epevt.events = event;
epevt.data.fd = fd;
if (epoll_ctl(_epfd, EPOLL_CTL_ADD , fd, &epevt) < 0)
{
Log(Warning) << "epoll add error : " << strerror(errno);
}
Log(Info) << "epoll add sucess , fd : " << fd ;
}
void Del_fd(int fd)
{
// 删除要进行等待的文件描述符
if (epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, nullptr) < 0)
{
Log(Warning) << "epoll del error : " << strerror(errno);
}
Log(Info) << "epoll del sucess , fd : " << fd;
}
void Mod_fd(int fd, uint32_t event)
{
// 对文件描述符的事件进行修改
struct epoll_event epevt;
epevt.events = event;
epevt.data.fd = fd;
if (epoll_ctl(_epfd, EPOLL_CTL_MOD, fd, &epevt) < 0)
{
Log(Warning) << "epoll mod error : " << strerror(errno);
}
}
int Wait(struct epoll_event *ep_array, int max_size, int timeout)
{
// 进行等待
return epoll_wait(_epfd, ep_array, max_size, timeout);
}
private:
int _epfd;
};
2.3 设计一个管理连接的类
因为TCP通信传递的是字节流,因此我们无法确定每次获取到的数据是一个有效报文,因此我们需要将所有获取到的数据都先存储起来:
- 我们需要一个整形,存储连接对应的文件描述符;
- 需要两个缓冲区:输入缓冲区和输出缓冲区;
-
当然为将代码的耦合性尽量降低一些,此处我们将不同文件描述符处理读写以及异常事件的方法也放到
Connection类中。
这些方法的参数统一都设置为:std::shared_ptr<Connection>来保证当跳转到外界去进行代码的执行时,依旧可以拿到文件描述符的相关资源。
class Connection;
using func_t = std::function<void(std::shared_ptr<Connection>)>;
class Connection
{
public:
Connection(int fd , func_t recv , func_t sender , func_t exception)
:_fd(fd) ,
_Recv(recv) , _Sender(sender) , _Exception(exception)
{
}
private:
int _fd; // 对应的文件描述符
std::string _inbuffer ; // 输入缓冲区
std::string _outbuffer; // 输出缓冲区
public:
func_t _Recv; // 处理接收的逻辑
func_t _Sender; // 处理发送的逻辑
func_t _Exception; // 处理出现异常时的逻辑
};
在该类中,毫无疑问我们在后续需要先缓冲区中进行读写操作:
std::string& Get_Inbuffer()
{
return _inbuffer;
}
std::string& Get_Outbuffer()
{
return _outbuffer;
}
void Add_In(const std::string& mes)
{
_inbuffer += mes;
}
void Add_Out(const std::string& mes)
{
_outbuffer += mes;
}
int Get_fd()
{
return _fd;
}
可能后续还需要使用一些操作,在后面再进行补充。
2.4 设计 Reactor服务器 类
- 需要一个Sock对象来从网路中获取客户端的连接;
- 需要一个Epoll对象来使用
epoll多路转接的接口; -
使用一个哈希表来存储每一个文件描述符与之对应的
Connection资源,方便我们后面获取一个文件描述符的输入缓冲区和输出缓冲区; - 还需要一个缓冲区,负责接收epoll模型等待结束后返回的就绪队列中的文件描述符信息。
class Rserver
{
static const int array_num_max = 1024;
public:
Rserver(uint16_t port)
:_sock_ptr(new Sock(port)) ,
_epoll_ptr(new Epoll)
{}
private:
std::shared_ptr<Sock> _sock_ptr;
std::shared_ptr<Epoll> _epoll_ptr;
std::unordered_map<int , std::shared_ptr<Connection> > _connections;
struct epoll_event _epl_array[array_num_max];
};
2.5 将文件描述符设置为非阻塞
在ET模式下,我们要保证一次将所有的资源都获取上来,因此我们需要while式的对资源进行读取,这就使得如果没有资源了我们也不能让其堵塞住,因此要将所有文件描述符设置为非阻塞状态。
此时使用int f***tl(int fd, int op , ... )接口进行设置:
int SetNoBlock(int fd)
{
int fl = f***tl(fd, F_GETFL);
fl |= O_NONBLOCK;
int n = f***tl(fd, F_SETFL, fl);
return n;
}
2.6 所有文件描述符的处理方法
2.6.1 普通文件描述符的处理方法
首先就是普通文件的接收方法:
- 将缓冲区中的数据全部读取到connection中;
- 调用外界函数判断是否含有一个完成的报文;
- 含有完整报文就进行处理。
对于第二步,我们可以先外界开放一个接口,让外界将数据进行处理,将处理好的数据再给我,由服务器进行发送,因此我们在服务端的类中添加一个成员,负责回调:
using callback_func = std::function<std::string(std::shared_ptr<Connection>)>;
class Rserver
{
static const int array_num_max = 1024;
public:
Rserver(uint16_t port , callback_func Onmessage)
:_sock_ptr(new Sock(port)) ,
_epoll_ptr(new Epoll) ,
_Onmessage(Onmessage)
{}
private:
std::shared_ptr<Sock> _sock_ptr;
std::shared_ptr<Epoll> _epoll_ptr;
std::unordered_map<int , std::shared_ptr<Connection> > _connections;
struct epoll_event _epl_array[array_num_max];
callback_func _Onmessage; // 负责回调
};
关于普通文件描述符的接受问题,需要注意的就是read的不同返回值进行不同的处理:
void Recv(std::shared_ptr<Connection> con_ptr)
{
// 1. 将缓冲区中的数据全部读取到Connection中
// 2. 调用外界函数判断是否含有一个完成的报文
// 3. 先客户端返回结果
char inbuffer[1024];
while(1)
{
int n = read(con_ptr->Get_fd() , inbuffer , sizeof(inbuffer) - 1);
if(n > 0)
{
// 有数据
inbuffer[n] = 0;
con_ptr->Add_In(inbuffer);
}
else if(n == 0)
{
// 对方关闭了文件 , 断开连接了
// 1. 将文件描述符从epoll模型中移除
// 2. 将文件描述符从哈希表中移除
// 3. 将文件描述符关闭
int fd = con_ptr->Get_fd();
_epoll_ptr->Del_fd(fd);
_connections.erase(fd);
close(fd);
return;
}
else
{
// 此次有两种情况: 1. 数据读取完了 2. 读取出错了
if(errno == EAGAIN) // 读取完了
{
break;
}
else // 出错了
{
// 此处调用文件对应的异常处理
con_ptr->_Exception(con_ptr);
return;
}
}
}
std::string ret = _Onmessage(con_ptr);
con_ptr->Add_Out(ret);
}
接下来就是编写发送的接口:
思考:对于发送接口是否需要判断,写事件是否就绪???
在大多数时候,写事件都是就绪的;因此如果将其加入到判断中epoll_wait就会频繁的进行返回,会影响效率;所以一般不对写事件加入到等待中,除非写缓冲区满了,此时才将写加入到等待中。
- 在代码中表现为:在调用
write接口的时候,实际写入的大小比我字符串要小。
void Sender(std::shared_ptr<Connection> con_ptr)
{
// 进行数据的发送
// 直接进行发送
std::string& outbuffer = con_ptr->Get_Outbuffer();
int fd = con_ptr->Get_fd();
// 循环式的进行发送
while(1)
{
int n = write(fd , outbuffer.c_str() , outbuffer.size());
if(n > 0)
{
// 1. 将已经发送的数据从字符串中移除
outbuffer.erase(0 , n);
if(outbuffer.empty()) break; // 已经写完了
}
else if(n == 0)
{
break;
}
else
{
if(errno == EAGAIN) // 已经写完了
break;
else // 出错了
{
// 此处调用文件对应的异常处理
con_ptr->_Exception(con_ptr);
return;
}
}
}
// 判断发送缓冲区中是否还有数据
if(!outbuffer.empty())
{
// 发送缓冲区满了
_epoll_ptr->Mod_fd(fd , EPOLLIN | EPOLLOUT | EPOLLET);
}
else
{
// 缓冲区没满 , 不需要对写事件进行检测
_epoll_ptr->Mod_fd(fd , EPOLLIN | EPOLLET);
}
}
最后一步就是对异常情况的处理了:
- 打印日志信息;
- 将文件描述符从
epoll模型从移除; - 将文件描述符从哈希表中移除;
- 关闭文件描述符。
void Exception(std::shared_ptr<Connection> con_ptr)
{
int fd = con_ptr->Get_fd();
_epoll_ptr->Del_fd(fd);
_connections.erase(fd);
close(fd);
}
2.6.2 套接字的处理方法
对于套接字来说,只需要负责将建立好的链接拿上来就行了,不需要进行写入和异常处理。
在创建为新的文件描述符创建Connection对象的是时候,我们需要传入可执行对象,但是我们在进行统一接口的时候参数都是std::shared_ptr<Connection>,并且上述的Recv,Sender,Expection都是类成员函数,都有一个隐含的参数this指针,所以对于可调用对象在进行传参的是否要使用bind进行绑定。
void A***ept(std::shared_ptr<Connection> con_ptr)
{
// 1. 获取文件描述符
while (1)
{
int newfd = _sock_ptr->A***ept();
if (newfd >= 0)
{
// 有新连接
// 2. 将文件描述符设置为非阻塞
// 3. 将文件加入到epoll模型中
// 4. 将文件描述符加入到哈希表中
if(SetNoBlock(newfd) < 0)
{
Log(Warning) << "set no block fail";
continue;
}
_epoll_ptr->Add_fd(newfd , EPOLLIN | EPOLLET);
std::shared_ptr<Connection> con_ptr(new Connection(newfd ,
std::bind(&Rserver::Recv , this , std::placeholders::_1),
std::bind(&Rserver::Sender , this , std::placeholders::_1),
std::bind(&Rserver::Exception , this , std::placeholders::_1)
));
_connections.emplace(newfd , con_ptr);
}
else
{
if(errno == EAGAIN) break;
else
{
// 出错了
Log(Warning) << "a***ept fail";
}
}
}
}
2.7 初始化服务器
- 创建套接字;
- 进行绑定;
- 设置监听模式;
- 将网络套接字加入到epoll模型中,并创建connection加入到_connections中进行管理;
- 在创建
Connection对象的时候,我们还需要设计一个套接字的Recv方法.
关于建立好的
void Init()
{
// 1. 创建套接字
// 2. 进行绑定
// 3. 设置监听模式
// 4. 将网络套接字加入到epoll模型中,并创建Connection加入到_connections中进行管理
_sock_ptr->Socket();
_sock_ptr->Bind();
_sock_ptr->Listen();
int listensock = _sock_ptr->Get_fd();
SetNoBlock(listensock);
_epoll_ptr->Add_fd(listensock , EPOLLIN | EPOLLET);
std::shared_ptr<Connection> conptr(new Connection(listensock,
std::bind(&Rserver::A***ept , this , std::placeholders::_1),
nullptr, nullptr));
_connections.emplace(listensock, conptr);
// 将IP和端口号设置为可复用的
int opt = 1;
setsockopt(listensock , SOL_SOCKET , SO_REUSEADDR | SO_REUSEPORT , &opt , sizeof(opt));
}
2.8 进行任务派发
因为我们之前已经将每个文件描述符对应的处理方法加入到了Connection对象中了,因此直接进行调用即可。
在进行任务派发的时候有一个细节:可以将异常处理嫁接到读写事件中的异常处理,这样就不需要再单独对异常进行处理了。
void Dispatcher(int n)
{
for (int i = 0; i < n; i++)
{
int fd = _epl_array[i].data.fd;
short events = _epl_array[i].events;
auto &con_ptr = _connections[fd];
// 将异常处理, 转化为读写处理
if (events & EPOLLERR)
{
events |= (EPOLLIN | EPOLLOUT);
}
if (_connections.count(fd) && con_ptr->_Recv)
{
con_ptr->_Recv(con_ptr);
}
if (_connections.count(fd) && con_ptr->_Sender)
{
con_ptr->_Sender(con_ptr);
}
}
}```
## 服务器的主循环
服务器的主循环就比较简单了,直接进行`epoll_wait`即可,将操作系统中的就绪队列拿到:
```cpp
void Run()
{
while (1)
{
int n = _epoll_ptr->Wait(_epl_array, array_num_max, -1);
if (n > 0)
{
Dispatcher(n);
}
else if (n == 0)
{
Log(Info) << "no message";
}
else
{
Log(Warning) << "epoll wait fail";
}
}
}
以上就是整个服务器的实现过程了,下面我们对服务器接入一下事件,让服务器能够处理一些业务。
三. 补充
3.1 实现在线计算器
此处我们引入之前:手动私下序列化和换序列化的代码,来实现一个手动计算器:
std::string Onmessge(std::shared_ptr<Connection> con_ptr)
{
static Calculator cal;
std::string& inbuffer = con_ptr->Get_Inbuffer();
std::string ret = cal(inbuffer); // 对请求进行处理 , 返回一个序列化后的字符串
return ret;
}
3.2 引入线程池
对于引入线程池,此代码就需要进行重构了,在Connection对象中我们需要存储一个Server的回指指针,但是此处不能直接使用shared_ptr<>否则会出现循环引用,因此要采用weak_ptr来实现。
但是注意:我们是在类的成员函数中使用其this指针来构建一个sharead_ptr,从而初始化weak_ptr;
如果在类的成员函数中,直接通过 this 指针创建新的 shared_ptr,会导致两个独立的 shared_ptr 管理同一个对象,但它们的引用计数是分开的:
- 原有的
shared_ptr(创建服务器时候的)计数减到 0 时,会释放对象; - 新创建的
shared_ptr(this指针创建的)计数减到 0 时,会再次尝试释放已被销毁的对象,导致双重释放(double free) 或未定义行为。
此处我们需要使用enable_shared_from_this<T>继承来进行解决:
- 当类
T继承enable_shared_from_this<T>后,该类会隐式包含一个weak_ptr<T>成员(内部维护)。当T的对象被shared_ptr管理时,这个weak_ptr会与管理该对象的shared_ptr共享控制块(记录引用计数的结构)。
此时,通过调用 shared_from_this() 方法,可返回一个指向自身的 shared_ptr<T>,这个新的 shared_ptr 会复用原有的引用计数,避免双重释放。
服务器类定义:
class Rserver : public std::enable_shared_from_this<Loop_Epollserver>
{
public:
// ......
};
在Connection类中增加一个成员:weak_ptr<Rserver> _loop_svr.
对于创建Connection对象部分也要进行修改:
std::shared_ptr<Connection> conptr(new Connection(listensock,shared_from_this(),
std::bind(&Rserver::A***ept, this, std::placeholders::_1),
nullptr, nullptr));
在第二个实参中,传入this指针来构建Connection中的weak_ptr。
关于线程池部分的代码因为文章篇幅就不再叙述了,大家可以自己试试写一下。