Java NIO 反应堆模式简单模型
一般NIO里反应堆模式都是这样:一个Acceptor(当然多个也行,不过一般场景一个够了)负责accept事件,把接收到Socket CHannel注册到按某种算法从Reactor池中取出的一个Reactor上,注册的事件为读,写等,之后这个Socket Channel的所有IO事件都和Acceptor没关系,都由被注册到的那个Reactor来负责。
每个Acceptor和每个Reactor都各自持有一个Selector
当然每个Acceptor和Reactor都得是一个线程(起码在逻辑上得是线程)
简单实现,三个类NioAcceptor、NioReactor和ReactorPool:
package cc.lixiaohui.demo.dp.reator; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Objects; import java.util.Set; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Acceptor负责处理SelectionKey.OP_ACCEPT事件, 将接收到的SocketChannel注册到Reactor上去 */ public class NioAcceptor { private int port; private String host; private Selector selector; // Java NIO Selector private final ServerSocketChannel serverChannel; // Java NIO ServerSocketChannel private ReactorPool reactorPool; // NioReactor池 private Thread thread; // 工作线程 private volatile boolean stop = false; private static final Logger logger = LoggerFactory.getLogger(NioAcceptor.class); public NioAcceptor(int port, String host, int reactorPoolSize) throws IOException { this.port = port; this.host = Objects.requireNonNull(host); this.reactorPool = new ReactorPool(reactorPoolSize); selector = Selector.open(); // 创建selector serverChannel = ServerSocketChannel.open(); // new server socket channel serverChannel.configureBlocking(false); // in non-blocking mode serverChannel.bind(new InetSocketAddress(host, port)); // bind serverChannel.register(selector, SelectionKey.OP_ACCEPT); // } public void stop() throws InterruptedException { stop = true; thread.join(); } public void start() { thread = new Thread(new AcceptTask(this)); thread.start(); } private static class AcceptTask implements Runnable { NioAcceptor acceptor; AcceptTask(NioAcceptor acceptor) { this.acceptor = acceptor; } public void run() { final Selector selector = acceptor.selector; Set<SelectionKey> keys = null; while (!acceptor.stop) { // 运行中 try { selector.select(1000L); // select, 最多等1秒 keys = selector.selectedKeys(); try { for (SelectionKey key : keys) { if (key.isValid() && key.isAcceptable()) { // 可accept SocketChannel channel = acceptor.serverChannel.accept(); channel.configureBlocking(false); // 取下一个Reactor并把SocketChannel加入到Reactor的注册队列 acceptor.reactorPool.nextReactor().postRegistry(channel); } else { key.cancel(); } } } finally { keys.clear(); } } catch (IOException e) { logger.error("", e); } } } } }
/** * Reactor负责SelectionKey.OP_READ | SelectionKey.OP_WRITE等事件 */ public class NioReactor { /** 待注册的{@link SocketChannel} 队列 */ private Queue<SocketChannel> registerQueue = new ConcurrentLinkedQueue<SocketChannel>(); private Selector selector; private volatile boolean stop = false; private Thread thread; private static final Logger logger = LoggerFactory.getLogger(NioReactor.class); public NioReactor() throws IOException { selector = Selector.open(); } public void postRegistry(SocketChannel channel) { registerQueue.add(channel); selector.wakeup(); // 唤醒selector, 以便让其即时处理注册 } public NioReactor start() { thread = new Thread(new ReactTask(this)); thread.start(); return this; } public void stop() throws InterruptedException { stop = true; thread.join(); } /** * 处理队列里面的待注册的SocketChannel */ private void doRegister(Selector selector) { while (!registerQueue.isEmpty()) { SocketChannel channel = registerQueue.poll(); try { // 注册读事件, 写事件无需注册, 写事件是业务驱动的, 当往channel写入 数据未写完时再注册写事件 channel.register(selector, SelectionKey.OP_READ); } catch (ClosedChannelException e) { logger.error("", e); } } } private void handleWrite(SelectionKey key) { // TODO 业务写 } private void handleRead(SelectionKey key) { // TODO 业务读 } private static class ReactTask implements Runnable { NioReactor reactor; ReactTask(NioReactor reactor) { this.reactor = reactor; } public void run() { Set<SelectionKey> keys = null; while (!reactor.stop) { final Selector selector = reactor.selector; try { selector.select(500L); reactor.doRegister(selector); // 处理注册 keys = selector.selectedKeys(); for (SelectionKey key : keys) { try { if (!key.isValid()) { // not valid key.cancel(); continue; } if (key.isReadable()) { // 可读 reactor.handleRead(key); } if (key.isWritable()) { // 可写 reactor.handleWrite(key); } } catch (Throwable t) { logger.error("", t); continue; } } } catch (IOException e) { logger.error("", e); } } } } }
ReactorPool用来管理Reactor:
public class ReactorPool extends LinkedList<NioReactor>{ private static final long serialVersionUID = 6525233920805533099L; private final int capacity; public ReactorPool(int size) { this.capacity = size; } // 轮询算法取下一个Reactor public NioReactor nextReactor() throws IOException { // 新建或从头部拿一个Reactor NioReactor reactor = size() < capacity ? new NioReactor().start() : poll(); add(reactor);// 加到尾部 return reactor; } }
参考:
Netty的NIO模型
Mycat的NIO实现
相关推荐
- 异步I/O的一种实现方式,区别于反应堆模式。 - 应用程序不直接对I/O请求进行处理,而是提交一个描述了I/O操作的异步操作给操作系统,由操作系统来完成实际的I/O操作,然后通过回调、事件通知或者其它方式通知...
9. **设计模式**:23种GOF设计模式是Java开发者应掌握的经典模式,它们提供了解决常见问题的最佳实践。 10. **Spring框架**:作为Java EE领域的主流框架,Spring提供了依赖注入、AOP(面向切面编程)、MVC(模型-...
Java知识管理是一个广泛的领域,它...以上只是Java知识管理的一部分,随着技术的发展,新的概念和技术如模块化(Jigsaw)、Lambda表达式、反应式编程等不断涌现,持续学习和更新知识是每个Java开发者必须面对的挑战。
在Java编程领域,进阶意味着深入理解语言特性、优化代码性能、掌握高级设计模式和框架。以下是一些关键的Java进阶知识点,旨在帮助你提升技能水平,成为一名更优秀的Java开发者。 1. **多线程**:Java是多线程编程...
7. **JVM内部机制**:深入解析Java虚拟机的工作原理,包括类加载机制、内存模型(堆、栈、方法区等)、垃圾收集算法和调优策略。 8. **Java性能优化**:讲解如何通过代码优化、配置调整等方式提升Java应用的运行...
10. **最新Java特性**:随着Java版本的更新,新特性不断引入,如Lambda表达式、Stream API、Optional类、模块化系统(Jigsaw)、反应式编程等。了解并能实际运用这些新特性,将使你在面试中更具竞争力。 掌握以上...
5. **IO/NIO**:流的分类、字符编码、缓冲区、过滤器,以及Java NIO(非阻塞I/O)的优势和使用。 6. **网络编程**:TCP/UDP协议,Socket编程,HTTP/HTTPS协议的理解及其应用。 7. **设计模式**:单例、工厂、抽象...
- **多路复用**:了解NIO(非阻塞I/O)和Java NIO.2框架,如Selector和Channel。 8. **设计模式** - **单例模式**:了解多种实现方式,如饿汉式、懒汉式、双重检查锁定等。 - **工厂模式**:理解简单工厂、工厂...
理解Java虚拟机(JVM)的工作原理,包括类加载机制、内存区域(堆、栈、方法区等)、垃圾回收(GC)以及性能调优,对于解决内存泄漏和性能问题非常重要。 七、反射与动态代理 Java反射机制允许在运行时检查类、接口...
Netty采用了Reactor模式,也称为反应器模式,这是一种多路复用的事件驱动模型。它将I/O事件与处理逻辑分离,通过事件循环(EventLoop)来轮询并处理待处理的事件,实现了高效的并发处理。在Netty中,EventLoopGroup...
9. **异步处理**:通过异步非阻塞IO(如NIO)和反应式编程,可以在处理高并发请求时,减少等待时间,提高系统吞吐量。 10. **代码重构**:定期进行代码审查和重构,消除冗余代码,减少不必要的计算和资源消耗,保持...
它基于Java的非阻塞I/O(NIO)模式,通过`Selector`来处理客户端的连接事件。 - `Acceptor`线程循环调用`selector.select(1000L)`来等待新连接的到来。`1000L`表示超时时间为1毫秒,如果没有连接到达,它将在1毫秒...
5. **IO/NIO**:Java的输入输出(IO)和非阻塞I/O(NIO)库提供了处理文件、网络数据传输的能力。 6. **多线程**:Java提供了内置的多线程支持,理解线程的创建、同步、通信对于构建并发程序至关重要。 7. **JVM**...
- **JVM内存模型**: 包括堆内存、栈内存、方法区、本地方法栈等区域的用途和大小调整。 3. **数据类型转换**: - **基本类型转换规则**: 如何在整型之间转换,以及与包装类之间的转换。 4. **集合框架**: - **...
更高版本的Java还引入了更多新特性,如模块系统、反应式编程等,也可能在示例中有所体现。 以上只是部分可能包含的知识点,具体"blog-examples-master"中的内容还需解压文件后查看。这些示例代码对于学习和提升Java...
11. **Java 8及更新版本**:Java 8引入了Lambda表达式、Stream API和日期时间API等新特性,后续版本也不断添加新功能,如模块系统(Project Jigsaw)、反应式编程支持(Project Reactor)等。 12. **测试**:单元...
可能采用了线程池、非阻塞I/O(NIO)、反应式编程模型(如Reactor或Akka)等技术,以减少上下文切换和等待时间。 2. **数据结构与算法**:在高频交易中,数据处理速度是关键。项目可能使用了优化的数据结构,如队列...
8. **设计模式**:单例、工厂、观察者、装饰器、适配器等常见设计模式的应用。 9. **JVM 内存模型**:堆内存、栈内存、方法区、垃圾收集机制等。 10. **I/O 流**:NIO(New I/O)和 AIO(Asynchronous I/O)非阻塞...
- **服务雪崩**:大量服务崩溃导致的连锁反应。 - **服务限流**:限制进入服务的流量。 - **服务熔断**:当服务出现问题时,直接拒绝调用,防止故障传播。 - **服务降级**:在发生故障时提供简化版本的服务。 #...