Reactor模式常用于java nio编程中,跟生产者消费者模式有点类似,可以认为是只有一个线程的生产者消费者模型,netty底层也是使用Reactor模式作为nio部分的开发
一个简单的Reactor模式
Reactor.java
package com.gbcom.protocol.nio.core;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
/**
* 反应器模式,适合nio编码,类似事件驱动编程方式,适合非阻塞IO
*
* 相比较传统的生产者消费者模式,,由于结合nio,不需要开启多个消费者线程,仅仅需要开启一个Reactor线程进行轮询
* 性能会更高,(reactor 模式 必须结合 nio 也就是 非阻塞的模式使用)
*
*
* netty 使用 reactor模式nio部分的编码
* @author SYZ
* @date 2016-11-1 上午10:52:58
* @version 1.0.0
* @see com.gbcom.protocol.nio.core.Reactor
*/
class Reactor implements Runnable {
final Selector selector;
final ServerSocketChannel serverSocket;
Reactor(int port) throws IOException {
selector = Selector.open();
serverSocket = ServerSocketChannel.open();
serverSocket.socket().bind(new InetSocketAddress(port));
serverSocket.configureBlocking(false);
SelectionKey sk = serverSocket.register(selector,
SelectionKey.OP_ACCEPT);
sk.attach(new Acceptor());
}
/*
* Alternatively, use explicit SPI provider: SelectorProvider p =
* SelectorProvider.provider(); selector = p.openSelector(); serverSocket =
* p.openServerSocketChannel();
*/
// class Reactor continued
public void run() { // normally in a new Thread
try {
while (!Thread.interrupted()) {
selector.select(); //select这个函数是block,
Set selected = selector.selectedKeys();
Iterator it = selected.iterator();
while (it.hasNext())
dispatch((SelectionKey) (it.next()));
selected.clear();
}
} catch (IOException ex) { /* ... */
}
}
void dispatch(SelectionKey k) {
Runnable r = (Runnable) (k.attachment());
if (r != null)
r.run();
}
// class Reactor continued //接收者处理方法
class Acceptor implements Runnable {
// inner
public void run() {
try {
SocketChannel c = serverSocket.accept();//1.select()方法仅仅通知有事件到来, 真正的接受 还是要使用 accept
//每个客户端 应该只执行一次,也就是 new Handler 会重新注册到该select中,以后不会再重复创建了。。。(待测试)
if (c != null)
new Handler(selector, c);
} catch (IOException ex) { /* ... */
}
}
}
/**
* Test: (Reactor.main)
* @param args
* @throws IOException
*/
public static void main(String args[]) throws IOException{
System.out.println(SelectionKey.OP_ACCEPT);
Reactor reactor = new Reactor(1107);
Thread t = new Thread(reactor);
// t.setDaemon(true);
t.start();
}
}
Handler.java
package com.gbcom.protocol.nio.core;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
/**
* 请求的处理,sk.attach(this);是关键,,也就是直接添加到 选择器中,,如果有事件到来,,直接会调用run方法
*
* 注意 该方法并没有开启线程处理
*
* @author SYZ
* @date 2016-11-1 上午11:08:04
* @version 1.0.0
* @see com.gbcom.protocol.nio.core.Handler
*/
final class Handler implements Runnable {
final SocketChannel socket;
final SelectionKey sk;
ByteBuffer input = ByteBuffer.allocate(0);
ByteBuffer output = ByteBuffer.allocate(100);
static final int READING = 0, SENDING = 1;
int state = READING;
Handler(Selector sel, SocketChannel c) throws IOException {
socket = c;
c.configureBlocking(false);
// Optionally try first read now
sk = socket.register(sel, 0);//把channel 注册到选择器中
sk.attach(this);// 添加到 select中,公用 reactor中的selector线程
sk.interestOps(SelectionKey.OP_READ);
sel.wakeup();// 立即执行
}
boolean inputIsComplete() {
return false; /* ... */
}
boolean outputIsComplete() {
return false; /* ... */
}
void process() {
System.out.println("process!");
/* ... */
}
// class Handler continued
public void run() {
try {
if (state == READING)
read();//IO的read是非阻塞的,但是逻辑是同步,所以有耗时的可能
else if (state == SENDING)
send();
} catch (IOException ex) { /* ... */
}
}
void read() throws IOException {
socket.read(input);
if (inputIsComplete()) {
process();
state = SENDING;// Normally also do first write now
sk.interestOps(SelectionKey.OP_WRITE);
// 设计模式方式
/*
* sk.attach(new Sender()); sk.interest(SelectionKey.OP_WRITE);
* sk.selector().wakeup();
*/
}
}
void send() throws IOException {
socket.write(output);
if (outputIsComplete())
sk.cancel();
}
/**
* 简单封装,并没有使用
* @author SYZ
* @date 2016-11-1 上午11:21:38
* @version 1.0.0
* @see com.gbcom.protocol.nio.core.Sender
*/
class Sender implements Runnable {
public void run() { // ...
try {
socket.write(output);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
if (outputIsComplete())
sk.cancel();
}
}
}
以上模式在正常非阻塞IO的情况效果还可以,但是要提供效率,可以使用多线程处理 io之后的业务逻辑,下面看一下多线程Reactor模式
多线程Reactor模式
ReactorWithPool.java
package com.gbcom.protocol.nio.core;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
/**
* 反应器的多线程版本,
*
* http://gee.cs.oswego.edu Multiple Reactor Threads " Using Reactor Pools Use
* to match CPU and IO rates Static or dynamic construction " Each with own
* Selector, Thread, dispatch loop Main acceptor distributes to other reacto
*
*
* 选择器也会资源紧张(未深究),所以可以创建多个
* @author SYZ
* @date 2016-11-1 上午11:46:49
* @version 1.0.0
* @see com.gbcom.protocol.nio.core.ReactorWithPool
*/
class ReactorWithPool implements Runnable {
final Selector mainSelector;
Selector[] subselectors = new Selector[10];//业务逻辑相关的选择器,,如何创建??
// also create threads
int next = 0;
final ServerSocketChannel serverSocket;
ReactorWithPool(int port) throws IOException {
mainSelector = Selector.open();
for(int i=0;i<10;i++){ //add by myself
subselectors[i] = Selector.open();
}
serverSocket = ServerSocketChannel.open();
serverSocket.socket().bind(new InetSocketAddress(port));
serverSocket.configureBlocking(false);
SelectionKey sk = serverSocket.register(mainSelector,
SelectionKey.OP_ACCEPT);
sk.attach(new Acceptor());
}
/*
* Alternatively, use explicit SPI provider: SelectorProvider p =
* SelectorProvider.provider(); selector = p.openSelector(); serverSocket =
* p.openServerSocketChannel();
*/
// class Reactor continued
public void run() { // normally in a new Thread
try {
while (!Thread.interrupted()) {
mainSelector.select();
Set selected = mainSelector.selectedKeys();
Iterator it = selected.iterator();
while (it.hasNext())
dispatch((SelectionKey) (it.next()));
selected.clear();
}
} catch (IOException ex) { /* ... */
}
}
void dispatch(SelectionKey k) {
Runnable r = (Runnable) (k.attachment());
if (r != null)
r.run();
}
// class Reactor continued //接收者处理方法
class Acceptor implements Runnable {
// inner
public void run() {
try {
SocketChannel connection = serverSocket.accept();
if (connection != null)
new Handler(subselectors[next], connection);
if (++next == subselectors.length)
next = 0;
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* Test: (Reactor.main)
*
* @param args
* @throws IOException
*/
public static void main(String args[]) throws IOException {
System.out.println(SelectionKey.OP_ACCEPT);
ReactorWithPool reactor = new ReactorWithPool(1107);
Thread t = new Thread(reactor);
t.setDaemon(true);
t.start();
}
}
HandlerWithPool.java
package com.gbcom.protocol.nio.core;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
/**
* 使用线程池处理read之后的事情,主要包括业务逻辑,
* 注意是处理IO read完之后的事情,不是处理IO的读写操作,适用于业务逻辑比较复杂的情况,
*
* 解决逻辑耗时问题
*
* @author SYZ
* @date 2016-11-1 上午11:33:03
* @version 1.0.0
* @see com.gbcom.protocol.nio.core.HandlerWithPool
*/
public class HandlerWithPool implements Runnable{
// uses util.concurrent thread pool
static PooledExecutor pool = new PooledExecutor();
static final int PROCESSING = 3;
final SocketChannel socket;
final SelectionKey sk;
ByteBuffer input = ByteBuffer.allocate(0);
ByteBuffer output = ByteBuffer.allocate(100);
static final int READING = 0, SENDING = 1;
int state = READING;
HandlerWithPool(Selector sel, SocketChannel c) throws IOException {
socket = c;
c.configureBlocking(false);
// Optionally try first read now
sk = socket.register(sel, 0);
sk.attach(this);// 添加到 select中,公用 reactor中的selector线程
sk.interestOps(SelectionKey.OP_READ);
sel.wakeup();// 立即执行
}
boolean inputIsComplete() {
return false; /* ... */
}
boolean outputIsComplete() {
return false; /* ... */
}
void process() {
System.out.println("process!");
/* ... */
}
// class Handler continued
public void run() {
try {
if (state == READING)
read();
else if (state == SENDING)
send();
} catch (IOException ex) { /* ... */
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
synchronized void read() throws IOException, InterruptedException {
socket.read(input);//非阻塞,模式,直接获取了数据,
if (inputIsComplete()) {
state = PROCESSING;
pool.execute(new Processer());
}
}
synchronized void processAndHandOff() {
process();
state = SENDING; // or rebind attachment
sk.interestOps(SelectionKey.OP_WRITE);
}
class Processer implements Runnable {
public void run() { processAndHandOff(); }
}
void send() throws IOException {
socket.write(output);
if (outputIsComplete())
sk.cancel();
}
/**
* 简单封装,并没有使用
* @author SYZ
* @date 2016-11-1 上午11:21:38
* @version 1.0.0
* @see com.gbcom.protocol.nio.core.Sender
*/
class Sender implements Runnable {
public void run() { // ...
try {
socket.write(output);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
if (outputIsComplete())
sk.cancel();
}
}
}
分享到:
相关推荐
本示例代码旨在帮助开发者理解如何使用Java NIO和Reactor模式构建网络服务。尽管测试代码可能不完全准确,但它能展示基本的原理和工作流程。 首先,我们要了解什么是Reactor模式。Reactor模式是一种事件驱动的设计...
总结,Java IO的发展经历了从BIO到NIO的转变,再到Reactor模式的优化,每一步都是为了提高系统的并发处理能力和资源利用率。理解并熟练掌握这些IO模型,对于开发高效、稳定的服务器端程序至关重要。通过对压缩包中的...
Reactor模式和NIO是两种在网络编程中广泛使用的并发处理技术。Reactor模式是一种事件驱动的设计模式,主要用于解决高并发场景下的系统设计问题,而Java的NIO(Non-blocking Input/Output,非阻塞I/O)是Java平台提供...
高性能Server-Reactor模型是构建高效能网络服务的核心设计模式,尤其在Java领域中广泛应用。该模型主要用于解决C/S(客户端/服务器)架构中的高并发、低延迟问题,特别是在现代互联网应用如电子邮件、视频流媒体、...
Reactor模式的主要工作流程是,事件分离器一直等待事件的发生,一旦检测到一个或多个I/O事件,就将其分发给相应的事件处理器进行处理。这种方式可以充分利用单线程的效率,用一个线程来处理所有I/O事件,提高了性能...
依据 Doug Lea 的 基于 NIO 实现的 Reacotr 模式的回显服务器 BasicHandler: 单线程处理器 MultiReactor: 主从 Reactor MultithreadHandler: 线程池处理器 Reactor: 接收连接,I/O 读写 Reactor 模型的说明: ...
反应器设计模式(Reactor Pattern)是事件驱动架构中的一种反应式编程模式,通常用于高并发的场景中。它主要用于处理多个事件的异步分发。在反应器模式中,有一个或多个输入源(例如,连接、数据流)和相应的事件...
标题《Scalable IO in Java》和描述表明本文档讨论...通过理解Reactor模式以及如何在Java中通过NIO API实现这一模式,开发者可以构建出能够处理大量并发连接的应用程序,并达到高可伸缩性、高可用性以及高性能的目标。
在Java中,NIO(Non-blocking I/O)框架就是Reactor模式的一个典型应用,它允许一个线程处理多个通道(Channel)上的I/O事件,而不是为每个连接创建一个单独的线程。NIO中的Selector组件就是Reactor的核心,它能够...
- Reactor模式是一种事件驱动的设计模式,用于处理多个并发输入请求。在这个群聊项目中,Reactor采用单线程模型,意味着所有事件(如新的连接、读写事件)都在同一个线程中处理,这简化了同步问题,但可能限制了...
本项目深入探讨了Java网络编程中的多种模式,包括BIO(阻塞IO)、NIO(非阻塞IO)、IO多路复用(select、poll、epoll)、Reactor模式,以及零拷贝技术。通过这些实现,项目展示了如何在高并发环境下优化网络通信效率...
可以作为NIO socket入门的例子,Reactor模式,重点理解key.attach, jar文件里包含了源代码 1,运行server.bat启动服务器,可以打开编辑,修改端口号 2,运行client.bat启动客户端,可以打开编辑,ip,和端口号 3...
3. **Reactor模式**:在NIO中,Reactor模式是一种处理多个客户端连接的并发设计模式。它通过一个线程来监听和接受新的连接,然后分派事件到相应的处理器。相比于传统的多线程模型,Reactor模式可以更有效地管理大量...
在Java中,NIO(非阻塞I/O)库为实现Reactor模式提供了基础。Netty基于NIO,并在其之上构建了一套高效且易用的网络编程框架。 Netty中的Reactor分为两个主要部分:主线程(BossGroup)和工作线程(WorkerGroup)。...
然而,Reactor模式的一个主要缺点是,当事件处理逻辑复杂时,可能导致回调地狱,增加了调试和维护的难度。 Observer和Reactor模式虽然都是处理事件和状态变化,但它们在实际应用中有各自的优势和适用场景。Observer...
实施的想法: select()的专用线程-React堆模式,通过特殊的WaitStrategy实现为一个中断实例。 N个线程(即处理器)处理IO事件。 一个NIO通道的处理始终在一个线程中进行(这意味着代码不必是线程安全的->性能提高...
本篇文章将详细介绍Reactor Netty的关键组件、工作原理以及如何在实际项目中应用。 一、Reactor Netty的架构 Reactor Netty的核心架构基于Netty框架,Netty是一个异步事件驱动的网络应用程序框架,适用于快速开发...
- **Reactor模式**:这是一种常见的设计模式,用于实现NIO网络框架。它包含以下核心组件: - **SynchronousEventDemultiplexer**:负责事件循环和事件分离。 - **Dispatcher**:事件派发器,负责将事件分发给对应...
Java NIO的Reactor模式是实现高并发的关键。Reactor模式是一种事件驱动的架构模式,它具有事件分离器、事件处理器以及复用的事件队列。事件分离器负责监听事件的发生,事件处理器对相应的事件做出响应。在Java NIO中...
与传统的阻塞 I/O 相比,NIO 的设计思想更为先进,它采用了 Reactor 模式的事件驱动机制来实现非阻塞 I/O。通过这种方式,Java NIO 能够显著提升 I/O 性能,并且在某些场景下的性能甚至可以与 C 语言媲美。 **...