自
JDK1.4
后,
Sun
积极推广
New IO
,其中
non-blocking
是新的
socket
编程模式,大幅度提高了服务器端
socket
并发处理能力,
Selector, SocketChannel
和
SelectionKey
这三个类配合使用,可以构成一个比较经典的
Reactor
模式。其中
Selector
类只筛选
socket
通讯中感兴趣的事件,如接受连接,读取数据,写数据等。
SocketChannel
是真正的
socket
通讯的实现者,负责读传到服务器端
socket
数据和往客户端
socket
写数据。
SelectionKey
是对
socket
通讯事件的封装。在实行一个
non-blocking IO
的
reactor
模式过程中,我仍然按照
reactor
模式的定义,设计多路复用分发器,分配器,任务处理程序,并模拟并发请求客户端。
多路复用分发器
DemultiPlexer
继承了
Thread
类,在
signal
方法中,调用
selector.select()
函数,该函数等待一个
SocketChannel
中的事件出现才返回。在
signal
方法中,对两种事件感兴趣,一个是
OP_ACCEPT,
一个是
OP_READ
,当收到
OP_ACCEPT
事件时,把
SelectionKey
中的
channel
中转化成
ServerSocketChannel
,并且掉中
ServerSocketChannel.accpet()
方法,返回一个
SocketChannel
,然后在设置成
non-blocking IO
模式,最后注册
OP_READ
事件。当收到一个
OP_READ
事件时,想
Dispatcher
注册这个事件,让
Dispatcher
去启动一个线程去处理该事件,处理完成后,并把该事件从
Selector
中撤销。
class DemultiPlexer extends Thread {
private Selector selector;
private Dispatcher dispatcher;
@Override
public void run() {
for (;;) {
try {
signal();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private void signal() throws IOException, InterruptedException {
int c = selector.select();
if (c <= 0) {
return;
}
Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
while (keys.hasNext()) {
SelectionKey key = keys.next();
keys.remove();
if (key.isValid()) {
if (key.isAcceptable()) {
accept(key);
} else if (key.isReadable()) {
dispatcher.register(key);
}
}
}
}
private void accept(SelectionKey key) throws IOException {
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
SocketChannel channel = ssc.accept();
channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_READ);
// key.cancel();
}
public DemultiPlexer(Selector selector, Dispatcher dispatcher) {
this.selector = selector;
this.dispatcher = dispatcher;
}
}
分配器
分配器的主要职责是负责监视
SelectionKey
队列之中已经被选取的
OP_READ
事件,如果队列中无任何事件,分配器线程处于中断状态,一旦有新的事件被注册到分配器中,分配器线程会被唤醒,去处理这个事件。为了说明
Reactor
模式,本例中,设计了只要有事件被注册,分配器就新建一个请求处理线程去读取数据这种简单的资源调度机制。在实际应用中,线程、数据库连接,内存等等都是资源,在有限的情况下,分配这些资源都会比这个模型复杂的多。
class Dispatcher extends Thread {
private static LinkedBlockingQueue<SelectionKey> queue = new LinkedBlockingQueue<SelectionKey>();
public void run() {
for (;;) {
try {
dispatch();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public void dispatch() throws IOException, InterruptedException {
synchronized (queue) {
if (queue.size() <= 0) {
queue.wait();
} else {
//拿出对列中的第一个事件,创建一个新的任务处理程序
SelectionKey key = queue.poll();
new RequestHandler(key).start();
}
}
}
/**
* 注册新的事件,并唤醒当前线程。
*/
public void register(SelectionKey key) throws IOException {
synchronized (queue) {
queue.add(key);
queue.notify();
}
}
}
请求处理程序
请求处理程序是真正的业务处理代码。本例中主要负责读取客户端
socket
写入的数据,然后把当前的
SelectionKey
取消,让下一次
select
操作忽略掉当前的
SelectionKey.
这个类中使用了
Charset
辅助类来完成把读取的字节反编成一个可读的字符串。
class RequestHandler extends Thread {
private SelectionKey selectionKey;
private static Charset utf8 = Charset.forName("utf-8");
@Override
public void run() {
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
ByteBuffer buff = ByteBuffer.allocate(128);
try {
// make sure the socketchannel is connected.
if (!socketChannel.isConnected())
socketChannel.finishConnect();
// read the data and print it after decode.
while (socketChannel.read(buff) > 0) {
buff.flip();
System.out.println(utf8.decode(buff));
buff.clear();
}
//remove the key for next selection operation
selectionKey.cancel();
} catch (IOException e) {
e.printStackTrace();
}
}
public RequestHandler(SelectionKey selectionKey) {
this.selectionKey = selectionKey;
}
}
请求客户端
一个请求客户端连上服务器端后,往服务器端发送
”Hello non-blocking IO reactor Pattern”
字符串。在
non blocking
IO socket
编程中,客户端的
SocketChannel
也使用
Selector
,并注册感兴趣的事件。本例中注册
OP_CONNECT
事件,然后确保
SocketChannel
是连接上服务器端的,再向服务器端发送一个字符串后,请求客户端的任务完成。
在
main
函数中,模拟了
50
个并发的客户端。
class Request extends Thread {
private int port = 9000;
@Override
public void run() {
Charset utf8 = Charset.forName("utf-8");
try {
SocketChannel channel = SocketChannel.open(new InetSocketAddress(port));
channel.configureBlocking(false);
Selector selector = Selector.open();
channel.register(selector, SelectionKey.OP_CONNECT);
if (!channel.isConnected())
channel.finishConnect();
channel.write(utf8.encode("Hello non-blocking IO reactor pattern.."));
channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String args[]) {
for (int i = 0; i < 50; i++)
new Request().start();
}
}
最后启动一个服务器端,来运行这个non-blocking IO的服务器端。在main函数中,首先open一个selector和ServerSocketChannel,配置为non-blocking IO模式,绑定服务端口9000并注册OP_ACCEPT。最后启动多路复用分发器线程和分配器线程。
public class ReactorPattern {
public static void main(String args[]) throws IOException {
Selector sel = Selector.open();
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
ssc.socket().bind(new InetSocketAddress(9000));
ssc.register(sel, SelectionKey.OP_ACCEPT);
Dispatcher disp = new Dispatcher();
disp.start();
new DemultiPlexer(sel, disp).start();
}
}
小结
在non-blocking IO模式中,DemultiPlexer类是担任了筛选事件和注册事件的工作,Selector.select()采用的是blocking方式,调用select方法,都必须等到其有返回值为至,但select方法本身是线程安全的,因此DemultiPlexer类在运行环境下可以存在多个线程,配合多个Dispacher线程一起工作。如
// initialize selector and register OP_ACCEPT
......
Dispatcher disp1 = new Dispatcher();
disp1.start();
new DemultiPlexer(sel, disp1).start();
Dispatcher disp2 = new Dispatcher();
disp2.start();
new DemultiPlexer(sel, disp2).start();
预告
Proactor模式是Reactor模式的兄弟,两者有很多相似之处,Proactor模式有何特点,相对于Reactor模式,它又有什么优点呢,如果您感兴趣,请继续关注我的博客。
[原创内容,版权所有,如有转载,请注明出处,如果您发现文中有什么错误请指出,不胜感激]
分享到:
相关推荐
**Reactor模式(一)** Reactor模式是一种事件驱动的设计模式,它主要用于处理并发I/O操作,通过将I/O事件的处理与事件处理程序解耦,实现高效的异步处理。在高并发环境下,Reactor模式可以显著提升系统性能,因为...
Reactor提供了一种避免这种阻塞的方式,通过使用发布-订阅模式(Publish-Subscribe pattern)或响应式编程模型,使得多个订阅者可以并发地处理事件,而无需锁定。 Reactor的几个关键组件包括: - **Flux**:用于处理...
"Reactor: An Object Behavioral Pattern for Demultiplexing and Dispatching Handles for Synchronous Events." In _Pattern Languages of Program Design_, edited by Jim Coplien and Douglas C. Schmidt, ISBN...
Reactor design pattern handles service requests that are delivered concurrently to an application by one or more clients
常见的网络模式有Reactor和Proactor两种,它们在并发处理和异步操作中扮演着关键角色。 **Reactor模式** Reactor模式是一种事件驱动的设计模式,广泛应用于多线程和网络编程。它的核心思想是通过一个中心调度器...
这就是io设计模式---reactor Reactor 是事件驱动的,并使用 os api(select、poll、epoll、kqueue、iocp 等)来调度 socket 事件。 编程只需要向Reactor注册事件源socket发生的读、写或except以及具体的事件处理...
Spring 5是Spring框架的第五个主要版本,提供了对响应式编程模型WebFlux的支持,这一特性是通过与Project Reactor的集成实现的。Spring 5设计模式这本图书,作者是Dinesh Rajput,涵盖了在高效开发Spring 5应用程序...
It covers the Node.js ecosystem and its philosophy, a short introduction to Node.js version 6, ES2015, and the reactor pattern. Chapter 2, Node.js Essential Patterns, introduces the first steps ...
Reactor模式是处理大量并发连接的关键,它通过事件驱动的方式实现了非阻塞I/O,Understanding Reactor Pattern系列文章是深入理解这一模式的好资源。 Lock-Free编程是另一个关键领域,特别是在高并发和高性能系统中...
反应器设计模式(Reactor Pattern)是事件驱动架构中的一种反应式编程模式,通常用于高并发的场景中。它主要用于处理多个事件的异步分发。在反应器模式中,有一个或多个输入源(例如,连接、数据流)和相应的事件...
##### Reactor Pattern 反应器模式 - **基本版本**:反应器模式的基本版本通常包含一个单一的线程负责监听和分发事件。 - **多线程版本**:为了进一步提高性能,可以采用多个线程的形式,其中主线程负责监听事件,...
本文档基于Doug Lea的作品,深入探讨了如何通过事件驱动处理以及反应器模式(Reactor pattern)来构建高效的IO处理系统。 首先,我们需要了解什么是可扩展网络服务。可扩展网络服务是指能够根据服务负载的变化,...
内容涉及事件驱动处理(Event-driven processing)和反应器模式(Reactor pattern),以及多线程版本和其他变种。 2. **非阻塞IO API** - Java NIO的非阻塞操作 - 介绍了非阻塞IO的API,重点在于非阻塞模式下如何...
- **Reactor Pattern**:为了解决上述问题,阿里巴巴采用了Reactor模式。在这种模式下,通过一个或多个线程监听所有连接上的I/O事件,并将这些事件分发给相应的处理程序,从而显著提高了系统的吞吐量和响应速度。 #...
#### 二、IPC SAP(Interprocess Communication Software Abstraction Package) **IPC SAP** 是ACE框架中的一个关键组成部分,主要用于进程间通信。它通过一系列的类来支持不同的通信方式,比如流式通信、数据报...
了解了React堆线程模型后,我们来看看`reactor-pattern-master`这个压缩包可能包含的内容。根据名称猜测,这可能是一个关于Reactor模式的示例项目或者教程资料。其中可能包括: - 代码示例:展示如何在Java中实现...
文档接着介绍了“反应器模式”(Reactor pattern),这是一种广泛用于实现事件驱动模型的架构模式。它包括了基本版本以及多线程版本和其他变体。反应器模式的核心思想是有一个等待事件发生的调度器(通常称为“反应...
3. 反应器模式(Reactor pattern):这是一种用于构建可扩展IO服务的设计模式,它包括基本版本和多线程版本,以及其它变体。反应器模式中,一个或多个输入源被监视,当输入源准备好进行IO操作时,事件会被分发到相应...
Chapter 1, A Short Introduction to Reactive Programming, helps you understand the context, thinking pattern, and principles of reactive programming. Chapter 2, Functional Programming with Kotlin and ...