`

Java NIO 反应堆模式

阅读更多

Java NIO 反应堆模式简单模型

一般NIO里反应堆模式都是这样:一个Acceptor(当然多个也行,不过一般场景一个够了)负责accept事件,把接收到Socket CHannel注册到按某种算法从Reactor池中取出的一个Reactor上,注册的事件为读,写等,之后这个Socket Channel的所有IO事件都和Acceptor没关系,都由被注册到的那个Reactor来负责。

 

每个Acceptor和每个Reactor都各自持有一个Selector

 

当然每个Acceptor和Reactor都得是一个线程(起码在逻辑上得是线程)

 

简单实现,三个类NioAcceptor、NioReactor和ReactorPool:

 

package cc.lixiaohui.demo.dp.reator;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Objects;
import java.util.Set;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Acceptor负责处理SelectionKey.OP_ACCEPT事件, 将接收到的SocketChannel注册到Reactor上去
 */
public class NioAcceptor {
	
	private int port;
	
	private String host;
	
	private Selector selector; // Java NIO Selector
	
	private final ServerSocketChannel serverChannel; // Java NIO ServerSocketChannel
	
	private ReactorPool reactorPool; // NioReactor池
	
	private Thread thread; // 工作线程
	
	private volatile boolean stop = false;
	
	private static final Logger logger = LoggerFactory.getLogger(NioAcceptor.class);
	
	public NioAcceptor(int port, String host, int reactorPoolSize) throws IOException {
		this.port = port;
		this.host = Objects.requireNonNull(host);
		this.reactorPool = new ReactorPool(reactorPoolSize);
		
		selector = Selector.open(); // 创建selector
		serverChannel = ServerSocketChannel.open(); // new server socket channel
		serverChannel.configureBlocking(false); // in non-blocking mode
		serverChannel.bind(new InetSocketAddress(host, port)); // bind
		serverChannel.register(selector, SelectionKey.OP_ACCEPT); // 
	}
	
	public void stop() throws InterruptedException {
		stop = true;
		thread.join();
	}
	
	public void start() {
		thread = new Thread(new AcceptTask(this));
		thread.start();
	}
	
	private static class AcceptTask implements Runnable {
		
		NioAcceptor acceptor;
		
		AcceptTask(NioAcceptor acceptor) {
			this.acceptor = acceptor;
		}

		public void run() {
			final Selector selector = acceptor.selector;
			Set<SelectionKey> keys = null;
			while (!acceptor.stop) { // 运行中
				try {
					selector.select(1000L); // select, 最多等1秒
					keys = selector.selectedKeys();
					try {
						for (SelectionKey key : keys) {
							if (key.isValid() && key.isAcceptable()) { // 可accept
								SocketChannel channel = acceptor.serverChannel.accept();
								channel.configureBlocking(false);
								// 取下一个Reactor并把SocketChannel加入到Reactor的注册队列
								acceptor.reactorPool.nextReactor().postRegistry(channel);
							} else {
								key.cancel();
							}
						}
					} finally {
						keys.clear();
					}
				} catch (IOException e) {
					logger.error("", e);
				}
			}
		}
	}
}
/**
 * Reactor负责SelectionKey.OP_READ | SelectionKey.OP_WRITE等事件
 */
public class NioReactor {
	
	/** 待注册的{@link SocketChannel} 队列 */
	private Queue<SocketChannel> registerQueue = new ConcurrentLinkedQueue<SocketChannel>();
	
	private Selector selector;
	
	private volatile boolean stop = false;
	
	private Thread thread;
	
	private static final Logger logger = LoggerFactory.getLogger(NioReactor.class);
	
	public NioReactor() throws IOException {
		selector = Selector.open();
	}
	
	public void postRegistry(SocketChannel channel) {
		registerQueue.add(channel);
		selector.wakeup(); // 唤醒selector, 以便让其即时处理注册
	}
	
	public NioReactor start() {
		thread = new Thread(new ReactTask(this));
		thread.start();
		return this;
	}
	
	public void stop() throws InterruptedException {
		stop = true;
		thread.join();
	}
	
	/**
	 * 处理队列里面的待注册的SocketChannel
	 */
	private void doRegister(Selector selector) {
		while (!registerQueue.isEmpty()) {
			SocketChannel channel = registerQueue.poll();
			try {
				// 注册读事件, 写事件无需注册, 写事件是业务驱动的, 当往channel写入 数据未写完时再注册写事件
				channel.register(selector, SelectionKey.OP_READ); 
			} catch (ClosedChannelException e) {
				logger.error("", e);
			}
		}
	}
	
	private void handleWrite(SelectionKey key) {
		// TODO 业务写
	}

	private void handleRead(SelectionKey key) {
		// TODO 业务读
	}
	
	private static class ReactTask implements Runnable {
		
		NioReactor reactor;
		
		ReactTask(NioReactor reactor) {
			this.reactor = reactor;
		}
		
		public void run() {
			Set<SelectionKey> keys = null;
			while (!reactor.stop) {
				final Selector selector = reactor.selector;
				try {
					selector.select(500L);
					reactor.doRegister(selector); // 处理注册
					keys = selector.selectedKeys();
					
					for (SelectionKey key : keys) {
						try {
							if (!key.isValid()) { // not valid
								key.cancel();
								continue;
							}
							if (key.isReadable()) { // 可读
								reactor.handleRead(key);
							}
							if (key.isWritable()) { // 可写
								reactor.handleWrite(key);
							}
						} catch (Throwable t) {
							logger.error("", t);
							continue;
						}
					}
				} catch (IOException e) {
					logger.error("", e);
				}
			}
		}
	}
 }

 

ReactorPool用来管理Reactor:

public class ReactorPool extends LinkedList<NioReactor>{
	
	private static final long serialVersionUID = 6525233920805533099L;
	
	private final int capacity;
	
	public ReactorPool(int size) {
		this.capacity = size;
	}
	// 轮询算法取下一个Reactor
	public NioReactor nextReactor() throws IOException {
		// 新建或从头部拿一个Reactor
		NioReactor reactor = size() < capacity ? new NioReactor().start() : poll();
		add(reactor);// 加到尾部
		return reactor;
	}

}

 

参考:

Netty的NIO模型

Mycat的NIO实现

0
1
分享到:
评论

相关推荐

    nio

    - 异步I/O的一种实现方式,区别于反应堆模式。 - 应用程序不直接对I/O请求进行处理,而是提交一个描述了I/O操作的异步操作给操作系统,由操作系统来完成实际的I/O操作,然后通过回调、事件通知或者其它方式通知...

    java必备宝典(经典必备)

    9. **设计模式**:23种GOF设计模式是Java开发者应掌握的经典模式,它们提供了解决常见问题的最佳实践。 10. **Spring框架**:作为Java EE领域的主流框架,Spring提供了依赖注入、AOP(面向切面编程)、MVC(模型-...

    java知识管理.pdf

    Java知识管理是一个广泛的领域,它...以上只是Java知识管理的一部分,随着技术的发展,新的概念和技术如模块化(Jigsaw)、Lambda表达式、反应式编程等不断涌现,持续学习和更新知识是每个Java开发者必须面对的挑战。

    Java进阶诀窍

    在Java编程领域,进阶意味着深入理解语言特性、优化代码性能、掌握高级设计模式和框架。以下是一些关键的Java进阶知识点,旨在帮助你提升技能水平,成为一名更优秀的Java开发者。 1. **多线程**:Java是多线程编程...

    oracle的javappt

    7. **JVM内部机制**:深入解析Java虚拟机的工作原理,包括类加载机制、内存模型(堆、栈、方法区等)、垃圾收集算法和调优策略。 8. **Java性能优化**:讲解如何通过代码优化、配置调整等方式提升Java应用的运行...

    java经典权威面试题,找工作的好帮手

    10. **最新Java特性**:随着Java版本的更新,新特性不断引入,如Lambda表达式、Stream API、Optional类、模块化系统(Jigsaw)、反应式编程等。了解并能实际运用这些新特性,将使你在面试中更具竞争力。 掌握以上...

    java经典面试题与世界500强面试题

    5. **IO/NIO**:流的分类、字符编码、缓冲区、过滤器,以及Java NIO(非阻塞I/O)的优势和使用。 6. **网络编程**:TCP/UDP协议,Socket编程,HTTP/HTTPS协议的理解及其应用。 7. **设计模式**:单例、工厂、抽象...

    JAVA面试 试题及答案 相关内容

    - **多路复用**:了解NIO(非阻塞I/O)和Java NIO.2框架,如Selector和Channel。 8. **设计模式** - **单例模式**:了解多种实现方式,如饿汉式、懒汉式、双重检查锁定等。 - **工厂模式**:理解简单工厂、工厂...

    JavaGuide-main.rar

    理解Java虚拟机(JVM)的工作原理,包括类加载机制、内存区域(堆、栈、方法区等)、垃圾回收(GC)以及性能调优,对于解决内存泄漏和性能问题非常重要。 七、反射与动态代理 Java反射机制允许在运行时检查类、接口...

    Netty-4.1.97.Final源码

    Netty采用了Reactor模式,也称为反应器模式,这是一种多路复用的事件驱动模型。它将I/O事件与处理逻辑分离,通过事件循环(EventLoop)来轮询并处理待处理的事件,实现了高效的并发处理。在Netty中,EventLoopGroup...

    如何利用Java开发高性能、高并发Web应用.ppt

    9. **异步处理**:通过异步非阻塞IO(如NIO)和反应式编程,可以在处理高并发请求时,减少等待时间,提高系统吞吐量。 10. **代码重构**:定期进行代码审查和重构,消除冗余代码,减少不必要的计算和资源消耗,保持...

    深入浅出Redis-redis哨兵集群.docx

    它基于Java的非阻塞I/O(NIO)模式,通过`Selector`来处理客户端的连接事件。 - `Acceptor`线程循环调用`selector.select(1000L)`来等待新连接的到来。`1000L`表示超时时间为1毫秒,如果没有连接到达,它将在1毫秒...

    5. **IO/NIO**:Java的输入输出(IO)和非阻塞I/O(NIO)库提供了处理文件、网络数据传输的能力。 6. **多线程**:Java提供了内置的多线程支持,理解线程的创建、同步、通信对于构建并发程序至关重要。 7. **JVM**...

    程序员的面试模板及技巧资料.pdf,这是一份不错的文件

    - **JVM内存模型**: 包括堆内存、栈内存、方法区、本地方法栈等区域的用途和大小调整。 3. **数据类型转换**: - **基本类型转换规则**: 如何在整型之间转换,以及与包装类之间的转换。 4. **集合框架**: - **...

    blog-examples

    更高版本的Java还引入了更多新特性,如模块系统、反应式编程等,也可能在示例中有所体现。 以上只是部分可能包含的知识点,具体"blog-examples-master"中的内容还需解压文件后查看。这些示例代码对于学习和提升Java...

    挑战

    11. **Java 8及更新版本**:Java 8引入了Lambda表达式、Stream API和日期时间API等新特性,后续版本也不断添加新功能,如模块系统(Project Jigsaw)、反应式编程支持(Project Reactor)等。 12. **测试**:单元...

    PPE-HFT:PPE高频交易

    可能采用了线程池、非阻塞I/O(NIO)、反应式编程模型(如Reactor或Akka)等技术,以减少上下文切换和等待时间。 2. **数据结构与算法**:在高频交易中,数据处理速度是关键。项目可能使用了优化的数据结构,如队列...

    SO-answers:我的 StackOverflow 答案的代码示例

    8. **设计模式**:单例、工厂、观察者、装饰器、适配器等常见设计模式的应用。 9. **JVM 内存模型**:堆内存、栈内存、方法区、垃圾收集机制等。 10. **I/O 流**:NIO(New I/O)和 AIO(Asynchronous I/O)非阻塞...

    (2024)跳槽涨薪必备精选面试题.pdf

    - **服务雪崩**:大量服务崩溃导致的连锁反应。 - **服务限流**:限制进入服务的流量。 - **服务熔断**:当服务出现问题时,直接拒绝调用,防止故障传播。 - **服务降级**:在发生故障时提供简化版本的服务。 #...

Global site tag (gtag.js) - Google Analytics