1.前言
JDK1.4之前的传统阻塞IO(BIO),服务端需要为每一个客户端连接创建单独的线程为其服务,从JDK1.4开始NIO非阻塞式IO出现,它只需要单独的一个线程就能接收多个客户端请求,而真正处理各个请求的细节可以使用多线程的方式高效率的完成,这些处理线程与具体的业务逻辑分离,做到了IO的复用。
2.源码分析
首先以一段典型的NIO使用代码开始:
Selector selector = Selector.open(); ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.configureBlocking(false); ssc.socket().bind(new InetSocketAddress(9527)); ssc.register(selector, SelectionKey.OP_ACCEPT); while(true){ int n = selector.select(); if (n <= 0) continue; Iterator it = selector.selectedKeys().iterator(); while(it.hasNext()){ SelectionKey key = (SelectionKey)it.next(); if (key.isAcceptable()){ SocketChannel sc= ((ServerSocketChannel) key.channel()).accept(); sc.configureBlocking(false); sc.register(key.selector(), SelectionKey.OP_READ|SelectionKey.OP_WRITE); } if (key.isReadable()){ SocketChannel channel = ((SocketChannel) key.channel()); ByteBuffer bf = ByteBuffer.allocate(10); int read = channel.read(bf); System.out.println("read "+read+" : "+new String(bf.array()).trim()); } if (key.isWritable()){ SocketChannel channel = ((SocketChannel) key.channel()); channel.write(ByteBuffer.wrap(new String("hello client").getBytes())); } it.remove(); } }
2.1 Selector.open() 获取选择器。
public static Selector open() throws IOException { return SelectorProvider.provider().openSelector(); } public static SelectorProvider provider() { synchronized (lock) { if (provider != null) return provider; return AccessController.doPrivileged( new PrivilegedAction<SelectorProvider>() { public SelectorProvider run() { if (loadProviderFromProperty()) return provider; if (loadProviderAsService()) return provider; provider = sun.nio.ch.DefaultSelectorProvider.create(); return provider; } }); } }
从Selector源码中可以看到,open方法是交给selectorProvider处理的。 其中provider = sun.nio.ch.DefaultSelectorProvider.create();会根据操作系统来返回不同的实现类,windows平台就返回WindowsSelectorProvider;Linux平台会根据不同的内核版本选择是使用select/poll模式还是epoll模式。
public static SelectorProvider create() { PrivilegedAction pa = new GetPropertyAction("os.name"); String osname = (String) AccessController.doPrivileged(pa); if ("SunOS".equals(osname)) { return new sun.nio.ch.DevPollSelectorProvider(); } // use EPollSelectorProvider for Linux kernels >= 2.6 if ("Linux".equals(osname)) { pa = new GetPropertyAction("os.version"); String osversion = (String) AccessController.doPrivileged(pa); String[] vers = osversion.split("\\.", 0); if (vers.length >= 2) { try { int major = Integer.parseInt(vers[0]); int minor = Integer.parseInt(vers[1]); if (major > 2 || (major == 2 && minor >= 6)) { return new sun.nio.ch.EPollSelectorProvider(); } } catch (NumberFormatException x) { // format not recognized } } } return new sun.nio.ch.PollSelectorProvider(); } sun.nio.ch.EPollSelectorProvider public AbstractSelector openSelector() throws IOException { return new EPollSelectorImpl(this); } sun.nio.ch.PollSelectorProvider public AbstractSelector openSelector() throws IOException { return new PollSelectorImpl(this); }
可以看到,如果Linux内核版本>=2.6则,具体的SelectorProvider为EPollSelectorProvider,否则为默认的PollSelectorProvider,实际上这是在JDK5U9之后才有这样的更新。
public static SelectorProvider create() { return new sun.nio.ch.WindowsSelectorProvider(); } sun.nio.ch.WindowsSelectorProvider public AbstractSelector openSelector() throws IOException { return new WindowsSelectorImpl(this); } WindowsSelectorImpl(SelectorProvider sp) throws IOException { super(sp); pollWrapper = new PollArrayWrapper(INIT_CAP); wakeupPipe = Pipe.open(); wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal(); // Disable the Nagle algorithm so that the wakeup is more immediate SinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink(); (sink.sc).socket().setTcpNoDelay(true); wakeupSinkFd = ((SelChImpl)sink).getFDVal(); pollWrapper.addWakeupSocket(wakeupSourceFd, 0); } void addWakeupSocket(int fdVal, int index) { putDescriptor(index, fdVal); putEventOps(index, POLLIN); }
接下来,以Windows的实现为准进行分析。在openSelector方法里面实例化WindowsSelectorImpl的过程中,
1).实例化了PollWrapper,pollWrapper用Unsafe类申请一块物理内存,用于存放注册时的socket句柄fdVal和event的数据结构pollfd.
2)Pipe.open()打开一个管道(打开管道的实现后面再看);拿到wakeupSourceFd和wakeupSinkFd两个文件描述符;把唤醒端的文件描述符(wakeupSourceFd)放到pollWrapper里.addWakeupSocket方法将source的POLLIN事件(有数据可读)标识为感兴趣的,当sink端有数据写入时,source对应的文件描述描wakeupSourceFd就会处于就绪状态.
public static Pipe open() throws IOException { return SelectorProvider.provider().openPipe(); } public Pipe openPipe() throws IOException { return new PipeImpl(this); } PipeImpl(final SelectorProvider sp) throws IOException { try { AccessController.doPrivileged(new Initializer(sp)); } catch (PrivilegedActionException x) { throw (IOException)x.getCause(); } } private Initializer(SelectorProvider sp) { this.sp = sp; } public Void run() throws IOException { LoopbackConnector connector = new LoopbackConnector(); connector.run(); ....//省略 } private class LoopbackConnector implements Runnable { @Override public void run() { ServerSocketChannel ssc = null; SocketChannel sc1 = null; SocketChannel sc2 = null; try { // Loopback address InetAddress lb = InetAddress.getByName("127.0.0.1"); assert(lb.isLoopbackAddress()); InetSocketAddress sa = null; for(;;) { // Bind ServerSocketChannel to a port on the loopback // address if (ssc == null || !ssc.isOpen()) { ssc = ServerSocketChannel.open(); ssc.socket().bind(new InetSocketAddress(lb, 0)); sa = new InetSocketAddress(lb, ssc.socket().getLocalPort()); } // Establish connection (assume connections are eagerly // accepted) sc1 = SocketChannel.open(sa); ByteBuffer bb = ByteBuffer.allocate(8); long secret = rnd.nextLong(); bb.putLong(secret).flip(); sc1.write(bb); // Get a connection and verify it is legitimate sc2 = ssc.accept(); bb.clear(); sc2.read(bb); bb.rewind(); if (bb.getLong() == secret) break; sc2.close(); sc1.close(); } // Create source and sink channels source = new SourceChannelImpl(sp, sc1); sink = new SinkChannelImpl(sp, sc2); } catch (IOException e) { try { if (sc1 != null) sc1.close(); if (sc2 != null) sc2.close(); } catch (IOException e2) {} ioe = e; } finally { try { if (ssc != null) ssc.close(); } catch (IOException e2) {} } } } }
通过创建管道的代码分析:创建管道的具体实现方式也是与具体的操作系统紧密相关的,这里以Windows为例,创建了一个PipeImpl对象, AccessController.doPrivileged调用后紧接着会执行initializer的run方法,在run方法里面,windows下的实现是创建两个本地的socketChannel,然后连接(链接的过程通过写一个随机long做两个socket的链接校验),两个socketChannel分别实现了管道的source与sink端。通过查阅资料,而在Linux下则是直接使用操作系统提供的管道。
到这里,Selector.open()就完成了,总结一下,主要完成以下几件事:
1.实例化pollWrapper对象,用于将来存放注册时的socket句柄fdVal和event的数据结构pollfd。
2.根据不同操作系统实现了用于自我唤醒的管道,Windows通过创建一对自己连着自己的socket通道,Linux直接使用系统提供的管道。同时,根据linux的不同内核版本还会选择底层进行事件通知的不同机制select/poll或者epoll。
2.2 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);通道注册
public final SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException{ synchronized (regLock) { SelectionKey k = findKey(sel); if (k != null) { k.interestOps(ops); k.attach(att); } if (k == null) { // New registration synchronized (keyLock) { if (!isOpen()) throw new ClosedChannelException(); k = ((AbstractSelector)sel).register(this, ops, att); addKey(k); } } return k; } }如果该channel和selector已经注册过,则直接添加事件和附件。否则通过selector实现注册过程。
protected final SelectionKey register(AbstractSelectableChannel ch, int ops, Object attachment) { if (!(ch instanceof SelChImpl)) throw new IllegalSelectorException(); SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this); k.attach(attachment); synchronized (publicKeys) { implRegister(k); } k.interestOps(ops); return k; } protected void implRegister(SelectionKeyImpl ski) { synchronized (closeLock) { if (pollWrapper == null) throw new ClosedSelectorException(); growIfNeeded(); channelArray[totalChannels] = ski; ski.setIndex(totalChannels); fdMap.put(ski); keys.add(ski); pollWrapper.addEntry(totalChannels, ski); totalChannels++; } } private void growIfNeeded() { if (channelArray.length == totalChannels) { int newSize = totalChannels * 2; // Make a larger array SelectionKeyImpl temp[] = new SelectionKeyImpl[newSize]; System.arraycopy(channelArray, 1, temp, 1, totalChannels - 1); channelArray = temp; pollWrapper.grow(newSize); } if (totalChannels % MAX_SELECTABLE_FDS == 0) { // more threads needed pollWrapper.addWakeupSocket(wakeupSourceFd, totalChannels); totalChannels++; threadsCount++; } } void addEntry(int index, SelectionKeyImpl ski) { putDescriptor(index, ski.channel.getFDVal()); }通过selector注册的过程主要完成以下几件事:
- 以当前channel和selector为参数,初始化 SelectionKeyImpl 对象,并添加附件attachment。
- 如果当前channel的数量totalChannels等于SelectionKeyImpl数组大小,对SelectionKeyImpl数组和pollWrapper进行扩容操作。
- 如果totalChannels % MAX_SELECTABLE_FDS == 0,则多开一个线程处理selector。windows上select系统调用有最大文件描述符限制,一次只能轮询1024个文件描述符,如果多于1024个,需要多线程进行轮询。
- ski.setIndex(totalChannels)选择键记录下在数组中的索引位置。
- keys.add(ski);将选择键加入到已注册键的集合中。
- fdMap.put(ski);保存选择键对应的文件描述符与选择键的映射关系。
- pollWrapper.addEntry将把selectionKeyImpl中的socket句柄添加到对应的pollfd。
- k.interestOps(ops)方法最终也会把event添加到对应的pollfd。
2.3 selector.select();
public int select() throws IOException { return select(0); } public int select(long timeout) throws IOException { if (timeout < 0) throw new IllegalArgumentException("Negative timeout"); return lockAndDoSelect((timeout == 0) ? -1 : timeout); } private int lockAndDoSelect(long timeout) throws IOException { synchronized (this) { if (!isOpen()) throw new ClosedSelectorException(); synchronized (publicKeys) { synchronized (publicSelectedKeys) { return doSelect(timeout); } } } }当调用selector.select()以及select(0)时,JDK对参数进行修正,其实传给doSelect的timeout为-1。当调用的是selectNow()的时候,timeout则为0,直接以负数作为参数则会抛出异常,其中的doSelector又回到我们的Windows实现:
相关推荐
本文将深入探讨Java NIO中的Selector机制,并通过源码分析来理解其实现原理。 Selector机制是Java NIO中的核心组件,它允许单线程同时监控多个通道(Channels)的状态变化,例如连接就绪、数据可读或可写等。这种...
Java NIO,全称为New Input/Output,是Java在1.4版本引入的一个新特性,旨在提供一种更高效、更具选择性的I/O模型。...通过分析NIO源码,我们可以深入了解其内部工作原理,进一步优化和调试相关代码。
Java NIO(New IO)是Java 1.4版本引入的一种新的I/O API,它提供了非阻塞I/O操作的能力,极大地提升了Java在...通过分析和学习这个源码,开发者可以深入理解Java NIO的工作原理,并将其应用于实际的网络编程项目中。
Java NIO(New IO)是Java 1.4版本引入的一个新API,全称为Non-blocking Input/Output,它提供了一种不同于传统IO的编程模型,传统IO基于块I/O,而NIO则基于通道(Channel)和缓冲区(Buffer)进行数据传输。NIO的...
在这个“java nio 聊天室源码”项目中,开发者使用了NIO来构建一个聊天室应用,以实现用户之间的实时通信。 1. **Java NIO基础** - **通道(Channel)**:在NIO中,数据是通过通道进行传输的,如SocketChannel、...
5. **源码分析** - **服务器源码**:服务器主要代码可能包括设置监听、接受连接、注册选择器、处理选择器返回的事件等部分。 - **客户端源码**:客户端代码则涉及连接服务器、发送数据、接收数据和处理响应等环节...
[第4节] JavaNIO流-通道1.flv [第5节] Java NIO流-通道2.flv [第6节] Java NIO流-socket通道操作.flv [第7节] Java NIO流-文件通道操作.flv [第8节] Java NIO流-选择器 .flv [第9节] Java NIO流-选择器操作.flv...
通过分析这些文件,开发者可以学习如何使用Java NIO构建一个事件驱动的服务器,包括如何创建和配置通道、如何注册通道到选择器、如何使用选择器进行多路复用、如何处理读写事件以及如何优化服务器性能等。...
源码分析可以帮助我们深入理解NIO的工作原理,从而更好地利用它来优化我们的应用程序。在"nioSamples"这个压缩包中,可能包含了各种NIO场景的示例代码,可以作为学习和调试NIO的参考。 通过学习和实践这些源码,...
Java NIO(New IO)是Java 1.4版本引入的一个新特性,是对传统IO模型的重大改进。在传统的IO模型中,数据传输基于字节流...同时,通过分析源代码,你将能更好地掌握NIO在实际项目中的应用,提升你的Java并发编程技能。
通过分析这个项目,你可以学习如何使用Java NIO进行网络编程,理解服务器端如何设置监听,客户端如何建立连接,以及如何在非阻塞模式下进行数据交换。这对于提升Java服务器开发和网络编程能力是非常有帮助的。
分析这个文件可以帮助我们更好地理解NIO的实际应用。 总之,Java NIO通过非阻塞的方式提高了系统处理并发I/O的能力,是构建高性能网络服务的重要工具。通过熟练掌握NIO的架构和使用,开发者可以设计出更加高效、可...
通过分析这个NIO项目源码,你可以学习到如何在实际项目中应用NIO技术,理解其设计模式和优化策略,对于提升Java网络编程的能力大有裨益。深入研究NIO的细节,有助于你在开发高效、高并发的服务时做出明智的选择。
《深入理解Java NIO:基于mldn nio ppt源码解析》 Java NIO(Non-blocking Input/Output,非阻塞I/O)是Java在JDK 1.4引入的一...同时,源码分析也有助于我们解决实际开发中遇到的I/O问题,更好地优化系统资源的使用。
在分布式Java应用中,TCP/IP协议和NIO(非阻塞I/O)是构建高性能、高可用性系统的关键技术。TCP/IP是一种传输层协议,确保数据在网络中的可靠传输,而NIO是Java提供的一个I/O模型,允许程序进行非阻塞的数据读写,...
JDK源码分析是提升Java编程能力的关键步骤,其中`sunw`包是Sun Microsystems公司的特定实现,通常包含了一些与具体操作系统相关的代码,而`com`和`org`包则可能包含了第三方库或JDK的内部实现。通过阅读这些源码,...
源码分析可以了解注解的处理流程,以及如何自定义注解处理器。 通过深入学习这些源码,开发者可以提升对Java的理解,优化代码性能,甚至为Java社区贡献自己的改进。记住,源码是学习的最好教材,它揭示了软件设计的...
我们将会分析`EchoServer.java`、`EchoClient.java`和`SocketUtils.java`这三个文件中的关键知识点。 首先,让我们从`EchoServer.java`开始。这是一个典型的回显服务器,它的主要任务是接收客户端发送的数据并...
`NonBlockingServer.java`和`Client.java`的代码分析和实践,可以帮助我们更好地理解和掌握Java NIO在Socket通信中的应用。在实际开发中,我们还需要考虑异常处理、资源管理等细节,以确保系统的稳定性和可靠性。