1.Reactor模式的重要性
1.1 为啥要首先学Reactor模式
- 到目前为止,高性能网络编程都绕不开反应器模式。很多著名的服务器软件或者中间件都是基于反应器模式实现的。比如Nginx、Redis、***ty等。
- 从开发的角度来说,如果要完成和胜任高性能的服务器开发,反应器模式是必须学会和掌握的。
- 从学习的角度来说,反应器模式相当于高性能、高并发的一项非常重要的基础知识,只有掌握了它,才能真正理解和掌握Nginx、Redis、***ty等这些大名鼎鼎的中间件技术。
- 在大的互联网公司如阿里、腾讯、京东的面试过程中,反应器模式相关的问题是经常出现的面试问题。
1.2 Reactor模式简介
Reactor反应器模式由Reactor反应器线程、Handlers处理器两大角色组成,两大角色的职责分别如下:
- Reactor反应器线程的职责:负责响应IO事件,并且分发到Handlers处理器。
- Handlers处理器的职责:非阻塞的执行业务处理逻辑。
Reactor线程负责多路I/O事件的查询,然后分发到一个或者多个Handler处理器完成I/O处理,所以,Reactor模式也叫Dispatcher模式。总之,Reactor模式和操作系统底层的IO多路复用模型相互结合,是编写高性能网络服务器的必备技术之一。
1.3 多线程OIO的致命缺陷
在Java的OIO编程中,最原始的网络服务器程序,一般是用一个while循环,不断地监听端口是否有新的连接。如果有,那么就调用一个处理函数来完成传输处理,示例代码如下:
while(true){
socket = a***ept(); //阻塞,接收连接
handle(socket);//读取数据、业务处理、写入结果
}
这种方法的最大问题是:如果前一个网络连接的handle(socket)没有处理完,那么后面的新连接没法被服务端接收,于是后面的请求就会被阻塞住,这样就导致服务器的吞吐量太低。这对于服务器来说,这是一个严重的问题。
为了解决这个严重的连接阻塞问题,出现了一个极为经典模式:Connection Per Thread(一个线程处理一个连接)模式。示例代码如下:
public class ConnectionPerThread implements Runnable{
@Override
public void run() {
try {
//服务器监听 socket
ServerSocket serverSocket =
new ServerSocket(NioDemoConfig.SOCKET_SERVER_PORT);
while (!Thread.interrupted()) {
Socket socket = serverSocket.a***ept();
//接收一个连接后,为 socket 连接,新建一个专属的处理器对象
Handler handler = new Handler(socket);
//创建新线程,专门负责一个连接的处理
new Thread(handler).start();
}
} catch (IOException ex) { /* 处理异常 */ }
}
//处理器,这里将内容回显到客户端
static class Handler implements Runnable {
final Socket socket;
Handler(Socket s) {
socket = s;
}
@Override
public void run() {
while (true) {
try {
byte[] input = new byte[1024];
/* 读取数据 */
socket.getInputStream().read(input);
/* 处理业务逻辑,获取处理结果*/
byte[] output =null;
/* 写入结果 */
socket.getOutputStream().write(output);
} catch (IOException ex) { /*处理异常*/ }
}
}
}
}
以上示例代码中,对于每一个新的网络连接都分配给一个线程。每个线程都独自处理自己负责的socket连接的输入和输出。当然,服务器的监听线程也是独立的,任何的socket连接的输入和输出处理,不会阻塞到后面新socket连接的监听和建立,这样,服务器的吞吐量就得到了提升。早期版本的Tomcat服务器,就是这样实现的。
Connection Per Thread模式(一个线程处理一个连接)的优点是:
- 解决了前面的新连接被严重阻塞的问题,在一定程度上,较大的提高了服务器的吞吐量。
Connection Per Thread模式的缺点是:
- 对应于大量的连接,需要耗费大量的线程资源,对线程资源要求太高。
- 在系统中,线程是比较昂贵的系统资源。如果线程的数量太多,系统无法承受。而且,线程的反复创建、销毁、线程的切换也需要代价。因此,在高并发的应用场景下,多线程OIO的缺陷是致命的。
如何减少线程数量,比如说让一个线程同时负责处理多个socket连接的输入和输出,行不行呢?
看上去,没有什么不可以。但是,实际上作用不大。为什么呢?传统OIO编程中每一次socket传输的IO读写处理,都是阻塞的。在同一时刻,一个线程里只能处理一个socket的读写操作,前一个socket操作被阻塞了,其他连接的IO操作同样无法被并行处理。所以在OIO中,即使是一个线程同时负责处理多个socket连接的输入和输出,同一时刻,该线程也只能处理一个连接的IO操作。
如何解决Connection Per Thread模式的巨大缺陷呢?一个有效途径是:使用Reactor反应器模式。用反应器模式对线程的数量进行控制,做到一个线程处理大量的连接。
2.单线程Reactor模式
总体来说,Reactor反应器模式有点儿类似事件驱动模式。在事件驱动模式中,当有事件触发时,事件源会将事件dispatch分发到handler处理器,由处理器负责事件处理。而反应器模式中的反应器角色,类似于事件驱动模式中的dispatcher事件分发器角色。
在反应器模式中,有Reactor反应器和Handler处理器两个重要的组件:
- Reactor反应器:负责查询IO事件,当检测到一个IO事件,将其发送给相应的Handler处理器去处理。这里的IO事件,就是NIO中选择器查询出来的通道IO事件。
- Handler处理器:与IO事件(或者选择键)绑定,负责IO事件的处理。完成真正的连接建立、通道的读取、处理业务逻辑、负责将结果写出到通道等。
2.1 什么是单线程Reactor反应器
什么是单线程版本的Reactor反应器模式呢?
简单地说,Reactor反应器和Handers处理器处于一个线程中执行。它是最简单的反应器模型
基于Java NIO如何实现简单的单线程版本的反应器模式呢?
需要用到SelectionKey选择键的几个重要的成员方法:
//此方法可以将任何的Java POJO对象,作为附件添加到SelectionKey实例。这方法非常重
//要,因为单线程版本的Reactor反应器模式实现中,可以将Handler处理器实例,作为附件添
//加到SelectionKey实例。
void attach(Object o); //将对象附加到选择键
//此方法与attach(Object o)是配套使用的,其作用是取出之前通过attach(Object o)方法添加
//到SelectionKey选择键实例的附加对象。这个方法同样非常重要,当IO事件发生时,选择键
//将被select方法查询出来,可以直接将选择键的附件对象取出。
Object attachment();// 从选择键获取附加对象
在Reactor模式实现中,通过attachment() 方法所取出的,是之前通过attach(Object o)方法绑定的Handler实例,然后通过该Handler实例,完成相应的传输处理。
总之,在反应器模式中,需要进行attach和attachment结合使用:在选择键注册完成之后,调用attach方法,将Handler实例绑定到选择键;当IO事件发生时,调用attachment方法,可以从选择键取出Handler实例,将事件分发到Handler处理器中,完成业务处理。
2.2 单线程Reactor反应器实现代码
EchoServerReactor实现代码如下:
public class EchoServerReactor implements Runnable{
Selector selector;
ServerSocketChannel serverSocketChannel;
//构造函数
EchoServerReactor() throws IOException {
// 获取selector
selector = Selector.open();
// 获取通道
serverSocketChannel = ServerSocketChannel.open();
// 设置为非阻塞
serverSocketChannel.configureBlocking(false);
serverSocketChannel.bind(new I***SocketAddress(8080));
//注册 serverSocket 的 a***ept 新连接接收事件
SelectionKey sk =serverSocketChannel.register(selector,
SelectionKey.OP_A***EPT);
//将新连接处理器作为附件,绑定到 sk 选择键
// 在注册serverSocket服务监听连接的接受事件之后,创建一个A***eptorHandler新连接
// 处理器的实例作为附件,被附加(attach)到了SelectionKey中。
sk.attach(new A***eptorHandler());
}
@Override
public void run() {
//选择器轮询
try {
while (!Thread.interrupted()) {
selector.select();
Set selected = selector.selectedKeys();
Iterator it = selected.iterator();
while (it.hasNext()) {
//反应器负责 dispatch 收到的事件
SelectionKey sk= (SelectionKey) it.next();
dispatch(sk);
}
selected.clear();
}
} catch (IOException ex) { ex.printStackTrace(); }
}
//反应器的分发事件
void dispatch(SelectionKey k) {
// 当新连接事件发生后,取出了之前attach到SelectionKey中的Handler业务处理器,进行
// socket的各种IO处理。
Runnable handler = (Runnable) (k.attachment());
//调用之前绑定到选择键的 handler 处理器对象
if (handler != null) {
handler.run();
}
}
// 处理器:处理新连接
// 处理器A***eptorHandler的两大职责:一是完成新连接的接收工作,二是在为新连接创建
// 一个负责数据传输的Handler处理器,称之为EchoHandler。
class A***eptorHandler implements Runnable {
@Override
public void run() {
//接受新连接
SocketChannel channel = null;
try {
// 接受新连接
channel = serverSocketChannel.a***ept();
} catch (IOException e) {
throw new RuntimeException(e);
}
//需要为新连接,创建一个输入输出的 handler 处理器
if (channel != null) {
try {
// EchoHandler就是负责socket连接的数据输入、业务处理、结果输出
new EchoHandler(selector, channel);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
}
}
EchoHandler代码实现如下:
public class EchoHandler implements Runnable {
final SocketChannel channel;
final SelectionKey sk;
//构造方法
EchoHandler (Selector selector, SocketChannel c) throws IOException {
channel = c;
c.configureBlocking(false);
//与之前的注册方式不同,先仅仅取得选择键,之后再单独设置感兴趣的 IO 事件
sk = channel.register(selector, 0); //仅仅取得选择键
//将 Handler 处理器作为选择键的附件
sk.attach(this);
//注册读写就绪事件
sk.interestOps(SelectionKey.OP_READ|SelectionKey.OP_WRITE);
}
@Override
public void run() {
//...处理输入和输出
}
}
2.3 单线程Reactor反应器模式的EchoServer实践案例
EchoServer的功能很简单:读取客户端的输入,回显到客户端,所以也叫回显服务器。基于Reactor反应器模式来实现,
设计3个重要的类:
- 设计一个反应器类:EchoServerReactor类
- 设计两个处理器类:A***eptorHandler新连接处理器、EchoHandler回显处理器设计两个处理器类A***eptorHandler新连接处理器、EchoHandler回显处理器
反应器类EchoServerReactor、A***eptorHandler的实现代码如下:
public class EchoServerReactor implements Runnable{
Selector selector;
ServerSocketChannel serverSocketChannel;
//构造函数
EchoServerReactor() throws IOException {
// 获取selector
selector = Selector.open();
// 获取通道
serverSocketChannel = ServerSocketChannel.open();
// 设置为非阻塞
serverSocketChannel.configureBlocking(false);
serverSocketChannel.bind(new I***SocketAddress(8080));
//注册 serverSocket 的 a***ept 新连接接收事件
SelectionKey sk =serverSocketChannel.register(selector,
SelectionKey.OP_A***EPT);
//将新连接处理器作为附件,绑定到 sk 选择键
// 在注册serverSocket服务监听连接的接受事件之后,创建一个A***eptorHandler新连接
// 处理器的实例作为附件,被附加(attach)到了SelectionKey中。
sk.attach(new A***eptorHandler());
}
//轮询和分发事件
@Override
public void run() {
//选择器轮询
try {
while (!Thread.interrupted()) {
selector.select(1000);
Set<SelectionKey> selected = selector.selectedKeys();
if (selected.isEmpty()) {
continue;
}
Iterator<SelectionKey> it = selected.iterator();
while (it.hasNext()) {
//反应器负责 dispatch 收到的事件
SelectionKey sk= it.next();
dispatch(sk);
}
selected.clear();
}
} catch (IOException ex) { ex.printStackTrace(); }
}
//反应器的分发事件
void dispatch(SelectionKey k) {
// 当新连接事件发生后,取出了之前attach到SelectionKey中的Handler业务处理器,进行
// socket的各种IO处理。
Runnable handler = (Runnable) (k.attachment());
//调用之前绑定到选择键的 handler 处理器对象
if (handler != null) {
handler.run();
}
}
// Handler之一:处理新连接
// 处理器A***eptorHandler的两大职责:一是完成新连接的接收工作,二是在为新连接创建
// 一个负责数据传输的Handler处理器,称之为EchoHandler。
class A***eptorHandler implements Runnable {
@Override
public void run() {
//接受新连接
SocketChannel channel = null;
try {
// 接受新连接
channel = serverSocketChannel.a***ept();
} catch (IOException e) {
throw new RuntimeException(e);
}
//需要为新连接,创建一个输入输出的 handler 处理器
if (channel != null) {
try {
// EchoHandler就是负责socket连接的数据输入、业务处理、结果输出
new EchoHandler(selector, channel);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
}
}
第二个处理器为EchoHandler回显处理器,也是一个传输处理器,主要是完成客户端的内容读取和回显,具体如下:
public class EchoHandler implements Runnable {
final SocketChannel channel;
final SelectionKey sk;
final ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
//处理器实例的状态:发送和接收,一个连接对应一个处理器实例
static final int RECIEVING = 0, SENDING = 1;
int state = RECIEVING;
EchoHandler (Selector selector, SocketChannel c) throws IOException {
channel = c;
c.configureBlocking(false);
//与之前的注册方式不同,先仅仅取得选择键,之后再单独设置感兴趣的 IO 事件
sk = channel.register(selector, 0); //仅仅取得选择键
//将 Handler 处理器作为选择键的附件
sk.attach(this);
//注册读写就绪事件
sk.interestOps(SelectionKey.OP_READ);
selector.wakeup();
}
@Override
public void run() {
try {
if (state == SENDING) {
//发送状态,写入数据到连接通道
channel.write(byteBuffer);
//写完后,准备开始从通道读,byteBuffer 切换成写入模式
byteBuffer.clear();
//注册 read 就绪事件,开始接收客户端数据
sk.interestOps(SelectionKey.OP_READ);
//修改状态,进入接收的状态
state = RECIEVING;
} else if (state == RECIEVING) {
//接收状态,从通道读取数据
int length = 0;
while ((length = channel.read(byteBuffer)) > 0) {
Logger.info(new String(byteBuffer.array(), 0, length));
}
//读完后,准备开始写入通道,byteBuffer 切换成读取模式
byteBuffer.flip();
//准备写数据到通道,注册 write 就绪事件
sk.interestOps(SelectionKey.OP_WRITE);
//注册完成后,进入发送的状态
state = SENDING;
}
//处理结束了, 这里不能关闭 select key,需要重复使用
//sk.cancel();
}catch (Exception e) {
e.printStackTrace();
}
}
}
以上代码是一个基于反应器模式的EchoServer回显服务器的完整实现。它是一个单线程版本的反应器模式,Reactor反应器和所有的Handler处理器实例的执行,都执行在同一条线程中。
2.4 单线程Reactor反应器模式的缺点
单线程Reactor反应器模式,是基于Java的NIO实现的。相对于传统的多线程OIO,反应器模式不再需要启动成千上万条线程,避免了线程上下文的频繁切换,服务端的效率自然是大大提升了。
在单线程反应器模式中,Reactor反应器和Handler处理器都执行在同一条线程上。这样,带来了一个问题:
- 当其中某个Handler阻塞时,会导致其他所有的Handler都得不到执行。在这种场景下,被阻塞的Handler不仅仅负责输入和输出处理的传输处理器,还包括负责新连接监听的A***eptorHandler处理器,这就可能导致服务器无响应。这个是非常严重的问题。因为这个缺陷,因此单线程反应器模型在生产场景中使用得比较少。
- 除此之外,目前的服务器都是多核的,单线程反应器模式模型不能充分利用多核资源。总之,在高性能服务器应用场景中,单线程反应器模式实际使用的很少。
3.多线程Reactor模式
3.1 多线程版本的Reactor模式演进
多线程Reactor反应器的演进,分为两个方面:
- 首先是升级Reactor反应器。可以考虑引入多个Selector选择器,提升查询和分发大量通道的IO事件的能力。
- 其次是升级Handler处理器。既要使用多线程,又要尽可能的高效率,则可以考虑使用线程池。
总体来说,多线程版本的反应器模式,大致如下:
- 将负责数据传输处理的IOHandler处理器的执行,放入独立的线程池中。这样,业务处理线程与负责新连接监听的反应器线程就能相互隔离,避免服务器的连接监听受到阻塞。
- 如果服务器为多核的CPU,可以将反应器线程拆分为多个子反应器(SubReactor)线程;同时,引入多个选择器,并且为每一个SubReactor引入一个线程,一个线程负责一个选择器的事件轮询。这样,充分释放了系统资源的能力;也大大提升反应器管理大量连接,或者监听大量传输通道的能力。
3.2 实战:多线程版本的Reactor反应器
在前面的“回显服务器”(EchoServerReactor)的基础上,完成多线程Reactor 反应器的升级。多线程反应器的实践案例设计如下:
- 引入多个选择器。
- 设计一个新的子反应器(SubReactor)类,一个子反应器负责查询一个选择器的查询、分发。
- 开启多个处理线程,一个处理线程负责执行一个子反应器(SubReactor)的IO事件。为了提升效率,这里由一个线程负责一个SubReactor的所有操作,避免多个线程负责一个选择器,导致需要进行线程同步,从而引发性能下降的问题。
- 进行IO事件的分类隔离。将新连接事件OP_A***EPT的反应处理,和普通的读(OP_READ)事件、写(OP_WRITE)事件反应处理,进行分开隔离。这里,专门用一个SubReactor负责新连接事件查询和分发,防止耗时的IO操作导致新连接事件OP_A***EPT事件查询发生延迟,这个专门的反应器也做bossReactor;与之相对应的、负责IO事件的查询、分发的反应器,叫做workReactor。
- 将IO事件的查询、分发和处理线程隔离。具体来说,就是将Handler处理器的执行,不放在Reactor绑定的线程上完成。实际上,在高并发、高性能的场景下,需要将耗时的处理与IO反应处理进行隔离,耗时的Handler处理器需要在专门的线程上完成,避免IO反应处理被阻塞。
多线程版本的MultiThreadEchoServerReactor反应器的逻辑模型如下图所示:
多线程版本反应器MultiThreadEchoServerReactor 的参考代码,大致如下:
public class MultiThreadEchoServerReactor {
private ServerSocketChannel serverSocket;
private AtomicInteger next = new AtomicInteger(0);
//负责监听的selector
private Selector bossSelector = null;
//引入多个selector选择器
private Selector[] workSelectors = new Selector[2];
private Reactor bossReactor = null;
//引入多个子反应器
private Reactor[] workReactors;
/**
* 构造器
*/
MultiThreadEchoServerReactor() throws IOException {
// 初始化多个selector选择器
bossSelector = Selector.open(); // 用于监听新连接事件
workSelectors[0] = Selector.open(); // 用于监听read、write事件
workSelectors[1] = Selector.open(); // 用于监听read、write事件
serverSocket = ServerSocketChannel.open();
// 绑定监听地址
serverSocket.bind(new I***SocketAddress(NioDemoConfig.SOCKET_SERVER_IP,
NioDemoConfig.SOCKET_SERVER_PORT));
// 设置Channel非阻塞
serverSocket.configureBlocking(false);
// bossSelector负责监听连接事件,将serverSocker注册到bossSelector
SelectionKey sk = serverSocket.register(bossSelector, SelectionKey.OP_A***EPT);
//绑定Handler:新连接监控handler绑定到SelectionKey(选择键)
sk.attach(new A***eptorHandler());
//bossReactor反应器,处理新连接的bossSelector
bossReactor = new Reactor(bossSelector);
//第一个子反应器,一子反应器负责一个worker选择器
Reactor workReactor1 = new Reactor(workeSelectors[0]);
//第二个子反应器,一子反应器负责一个worker选择器
Reactor workReactor2 = new Reactor(workeSelectors[1]);
workReactors =new Reactor[]{workReactor1, workReactor2};
}
private void startService() {
// 一子反应器对应一条线程
new Thread(bossReactor).start();
new Thread(workReactors[0]).start();
new Thread(workReactors[1]).start();
}
/**
* 反应器内部类
*/
class Reactor implements Runnable{
//每条线程负责一个选择器的查询
final Selector selector;
public Reactor(Selector selector) {
this.selector = selector;
}
@Override
public void run() {
try {
while (!Thread.interrupted()) {
//单位为毫秒
selector.select(1000);
Set<SelectionKey> selectedKeys = selector.selectedKeys();
if (null == selectedKeys || selectedKeys.size() == 0) {
continue;
}
Iterator<SelectionKey> it = selectedKeys.iterator();
while (it.hasNext()) {
//Reactor负责dispatch收到的事件
SelectionKey sk = it.next();
dispatch(sk);
}
selectedKeys.clear();
}
} catch (IOException ex) {
ex.printStackTrace();
}
}
void dispatch(SelectionKey sk) {
Runnable handler = (Runnable) sk.attachment();
//调用之前attach绑定到选择键的handler处理器对象
if (handler != null) {
handler.run();
}
}
}
class A***eptorHandler implements Runnable{
@Override
public void run() {
try{
SocketChannel channel = serverSocket.a***ept();
Logger.info("接收到一个新的连接");
if (channel != null) {
int index = next.get();
Selector selector = workSelectors[index];
new MultiThreadEchoEchoHandler(selector,channel);
}
} catch (Exception e) {
e.printStackTrace();
}
if (next.incrementAndGet() == workSelectors.length) {
next.set(0);
}
}
}
}
上面是反应器的多线程版本演进代码,总共三个选择器。第一个选择器作为boss,专门负责查询和分发新连接事件;第二个、三个选择器作为worker,专门负责查询和分发IO传输事件。
上面的代码创建了三个子反应器,一个bossReactor负责新连接事件的反应处理(查询、分发、处理),bossReactor和boss选择器进行绑定;两个workReactor负责普通IO事件的查询和分发,分别绑定一个worker选择器。
服务端的监听通道注册到boss选择器,而所有的Socket传输通道通过轮询策略注册到worke选择器,从而实现了新连接监听和IO读写事件监听的线程分离。
3.3 实战:多线程版本的Handler处理器
回显处理器为:MultiThreadEchoHandler。主要的升级是引入了一个线程池(ThreadPool),使得数据传输和业务处理的代码执行在独立的线程池中,彻底地做到IO处理以及业务处理线程和反应器IO事件轮询线程的完全隔离。
实现代码如下:
public class MultiThreadEchoHandler implements Runnable {
final SocketChannel channel;
final SelectionKey sk;
final ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
static final int RECIEVING = 0, SENDING = 1;
int state = RECIEVING;
//引入线程池
static ExecutorService pool = Executors.newFixedThreadPool(4);
MultiThreadEchoHandler(Selector selector, SocketChannel c) throws IOException {
channel = c;
channel.configureBlocking(false);
channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
//仅仅取得选择键,后设置感兴趣的IO事件
sk = channel.register(selector, 0);
//将本Handler作为sk选择键的附件,方便事件dispatch
sk.attach(this);
//向sk选择键注册Read就绪事件
sk.interestOps(SelectionKey.OP_READ);
//唤醒 查询线程,使得OP_READ生效
selector.wakeup();
Logger.info("新的连接 注册完成");
}
public void run() {
//异步任务,在独立的线程池中执行
//提交数据传输任务到线程池
//使得IO处理不在IO事件轮询线程中执行,在独立的线程池中执行
pool.execute(new MultiThreadEchoHandler.AsyncTask());
}
//异步任务,不在Reactor线程中执行
//数据传输与业务处理任务,不在IO事件轮询线程中执行,在独立的线程池中执行
public synchronized void asyncRun() {
try {
if (state == SENDING) {
//写入通道
channel.write(byteBuffer);
//写完后,准备开始从通道读,byteBuffer切换成写模式
byteBuffer.clear();
//写完后,注册read就绪事件
sk.interestOps(SelectionKey.OP_READ);
//写完后,进入接收的状态
state = RECIEVING;
} else if (state == RECIEVING) {
//从通道读
int length = 0;
while ((length = channel.read(byteBuffer)) > 0) {
Logger.info(new String(byteBuffer.array(), 0, length));
}
//读完后,准备开始写入通道,byteBuffer切换成读模式
byteBuffer.flip();
//读完后,注册write就绪事件
sk.interestOps(SelectionKey.OP_WRITE);
//读完后,进入发送的状态
state = SENDING;
}
//处理结束了, 这里不能关闭select key,需要重复使用
//sk.cancel();
} catch (IOException ex) {
ex.printStackTrace();
}
}
//异步任务的内部类
class AsyncTask implements Runnable {
public void run() {
MultiThreadEchoHandler.this.asyncRun();
}
}
}
4.Reactor反应器模式和优点和缺点
反应器模式和生产者消费者模式对比
- **二者相似之处:**在一定程度上,反应器模式有点类似生产者消费者模式。在生产者消费者模式中,一个或多个生产者将事件加入到一个队列中,一个或多个消费者主动地从这个队列中拉取(Pull)事件来处理。
- 二者不同之处:反应器模式是基于查询的,没有专门的队列去缓冲存储IO事件,查询到IO事件之后,反应器会根据不同IO选择键(事件)将其分发给对应的Handler处理器来处理。
反应器模式和观察者模式(Observer Pattern)对比
- 相似之处在于:在反应器模式中,当查询到IO事件后,服务处理程序使用单路/多路分发(Dispatch)策略,同步地分发这些IO事件。观察者模式(Observer Pattern)也被称作发布/订阅模式,它定义了一种依赖关系,让多个观察者同时监听某一个主题(Topic)。这个主题对象在状态发生变化时,会通知所有观察者,它们能够执行相应的处理。
- 不同之处在于:在反应器模式中, Handler处理器实例和IO事件(选择键)的订阅关系,基本上是一个事件绑定到一个Handler处理器;每一个IO事件(选择键)被查询后,反应器会将事件分发给所绑定的Handler处理器,也就是一个事件只能被一个Handler处理;而在观察者模式中,同一个时刻,同一个主题可以被订阅过的多个观察者处理。
作为高性能的IO模式,反应器模式的优点如下:
- 响应快,虽然同一反应器线程本身是同步的,但不会被单个连接的IO操作所阻塞;
- 编程相对简单,最大程度避免了复杂的多线程同步,也避免了多线程的各个进程之间切换的开销;
- 可扩展,可以方便地通过增加反应器线程的个数来充分利用CPU资源。
反应器模式的缺点如下:
- 反应器模式增加了一定的复杂性,因而有一定的门槛,并且不易于调试。
- 反应器模式依赖于操作系统底层的IO多路复用系统调用的支持,如Linux中的epoll系统调用。如果操作系统的底层不支持IO多路复用,反应器模式不会有那么高效
- 同一个Handler业务线程中,如果出现一个长时间的数据读写,会影响这个反应器中其他通道的IO处理。例如在大文件传输时,IO操作就会影响其他客户端(Client)的响应时间。因而对于这种操作,还需要进一步对反应器模式进行改进。