Selector:java nio无阻塞io实现的关键。
阻塞io和无阻塞io:
阻塞io是指jdk1.4之前版本面向流的io,服务端需要对每个请求建立一堆线程等待请求,而客户端发送请求后,先咨询服务端是否有线程相应,如果没有则会一直等待或者遭到拒 绝请求,如果有的话,客户端会线程会等待请求结束后才继续执行。
当并发量大,而后端服务或客户端处理数据慢时就会产生产生大量线程处于等待中,即上述的阻塞。
无阻塞io是使用单线程或者只使用少量的多线程,每个连接共用一个线程,当处于等待(没有事件)的时候线程资源可以释放出来处理别的请求,通过事件驱动模型当有accept/read/write等事件发生后通知(唤醒)主线程分配资源来处理相关事件。java.nio.channels.Selector就是在该模型中事件的观察者,可以将多个SocketChannel的事件注册到一个Selector上,当没有事件发生时Selector处于阻塞状态,当SocketChannel有accept/read/write等事件发生时唤醒Selector。
这个Selector是使用了单线程模型,主要用来描述事件驱动模型,要优化性能需要一个好的线程模型来使用,目前比较好的nio框架有Netty,apache的mina等。线程模型这块后面再分享,这里重点研究Selector的阻塞和唤醒原理。
先看一段简单的Selector使用的代码
selector = Selector.open(); ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.configureBlocking(false); ssc.socket().bind(new InetSocketAddress(port)); ssc.register(selector, SelectionKey.OP_ACCEPT); while (true) { // select()阻塞,等待有事件发生唤醒 int selected = selector.select(); if (selected > 0) { Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator(); while (selectedKeys.hasNext()) { SelectionKey key = selectedKeys.next(); if ((key.readyOps() & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT) { // 处理 accept 事件 } else if ((key.readyOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ) { // 处理 read 事件 } else if ((key.readyOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE) { // 处理 write 事件 } selectedKeys.remove(); } } }
代码中关键的几个点在:
Selector.open();
selector.select();
阻塞后唤醒可以通过注册在selector上的socket有事件发生 或者 selector.select(timeOut)超时 或者 selector.wakeup()主动唤醒;
整个阻塞和唤醒的过程涉及到的点非常多,先上一张梳理出的整体图,再进入源码会比较容易理解
现在通过openjdk中的源码来解析上图中的每一个环节:
1. Selector.open()
Selector.java ----- public static Selector open() throws IOException { return SelectorProvider.provider().openSelector(); }
先看看SelectorProvider.provider()做了什么:
SelectorProvider.java ----- public static SelectorProvider provider() { synchronized (lock) { if (provider != null) return provider; return (SelectorProvider)AccessController .doPrivileged(new PrivilegedAction() { public Object run() { if (loadProviderFromProperty()) return provider; if (loadProviderAsService()) return provider; provider = sun.nio.ch.DefaultSelectorProvider.create(); return provider; } }); } }
其中provider = sun.nio.ch.DefaultSelectorProvider.create();会根据操作系统来返回不同的实现类,windows平台就返回WindowsSelectorProvider;
这里主要以windows的实现来梳理整个流程,拿到provider后来看openSelector()中的实现
WindowsSelectorProvider.java ---- public AbstractSelector openSelector() throws IOException { return new WindowsSelectorImpl(this); } WindowsSelectorImpl.java ---- WindowsSelectorImpl(SelectorProvider sp) throws IOException { super(sp); pollWrapper = new PollArrayWrapper(INIT_CAP); wakeupPipe = Pipe.open(); wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal(); // Disable the Nagle algorithm so that the wakeup is more immediate SinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink(); (sink.sc).socket().setTcpNoDelay(true); wakeupSinkFd = ((SelChImpl)sink).getFDVal(); pollWrapper.addWakeupSocket(wakeupSourceFd, 0); }
这段代码中做了如下几个事情
Pipe.open()打开一个管道(打开管道的实现后面再看);拿到wakeupSourceFd和wakeupSinkFd两个文件描述符;把唤醒端的文件描述符(wakeupSourceFd)放到pollWrapper里;
那么为什么需要一个管道,这个管道是怎么实现的?接下来看Pipe.open()做了什么
Pipe.java ---- public static Pipe open() throws IOException { return SelectorProvider.provider().openPipe(); }
同样,SelectorProvider.provider()也是获取操作系统相关的实现
SelectorProvider.java ---- public Pipe openPipe() throws IOException { return new PipeImpl(this); }
这里还是看windows下的实现
PipeImpl.java ---- PipeImpl(final SelectorProvider sp) throws IOException { try { AccessController.doPrivileged(new Initializer(sp)); } catch (PrivilegedActionException x) { throw (IOException)x.getCause(); } }
创建了一个PipeImpl对象, AccessController.doPrivileged调用后紧接着会执行initializer的run方法
PipeImpl.Initializer ----- public Object run() throws IOException { ServerSocketChannel ssc = null; SocketChannel sc1 = null; SocketChannel sc2 = null; try { // loopback address InetAddress lb = InetAddress.getByName("127.0.0.1"); assert(lb.isLoopbackAddress()); // bind ServerSocketChannel to a port on the loopback address ssc = ServerSocketChannel.open(); ssc.socket().bind(new InetSocketAddress(lb, 0)); // Establish connection (assumes connections are eagerly // accepted) InetSocketAddress sa = new InetSocketAddress(lb, ssc.socket().getLocalPort()); sc1 = SocketChannel.open(sa); ByteBuffer bb = ByteBuffer.allocate(8); long secret = rnd.nextLong(); bb.putLong(secret).flip(); sc1.write(bb); // Get a connection and verify it is legitimate for (;;) { sc2 = ssc.accept(); bb.clear(); sc2.read(bb); bb.rewind(); if (bb.getLong() == secret) break; sc2.close(); } // Create source and sink channels source = new SourceChannelImpl(sp, sc1); sink = new SinkChannelImpl(sp, sc2); } catch (IOException e) { try { if (sc1 != null) sc1.close(); if (sc2 != null) sc2.close(); } catch (IOException e2) { } IOException x = new IOException("Unable to establish" + " loopback connection"); x.initCause(e); throw x; } finally { try { if (ssc != null) ssc.close(); } catch (IOException e2) { } } return null; }
这里即为上图中最下面那部分创建pipe的过程,windows下的实现是创建两个本地的socketChannel,然后连接(链接的过程通过写一个随机long做两个socket的链接校验),两个socketChannel分别实现了管道的source与sink端。
source端由前面提到的WindowsSelectorImpl放到了pollWrapper中(pollWrapper.addWakeupSocket(wakeupSourceFd, 0))
PollArrayWrapper.java ---- private AllocatedNativeObject pollArray; // The fd array // Adds Windows wakeup socket at a given index. void addWakeupSocket(int fdVal, int index) { putDescriptor(index, fdVal); putEventOps(index, POLLIN); } // Access methods for fd structures void putDescriptor(int i, int fd) { pollArray.putInt(SIZE_POLLFD * i + FD_OFFSET, fd); } void putEventOps(int i, int event) { pollArray.putShort(SIZE_POLLFD * i + EVENT_OFFSET, (short)event); }
这里将source的POLLIN事件标识为感兴趣的,当sink端有数据写入时,source对应的文件描述符wakeupSourceFd就会处于就绪状态
AllocatedNativeObject.java ---- class AllocatedNativeObject extends NativeObject AllocatedNativeObject(int size, boolean pageAligned) { super(size, pageAligned); } NativeObject.java ---- protected NativeObject(int size, boolean pageAligned) { if (!pageAligned) { this.allocationAddress = unsafe.allocateMemory(size); this.address = this.allocationAddress; } else { int ps = pageSize(); long a = unsafe.allocateMemory(size + ps); this.allocationAddress = a; this.address = a + ps - (a & (ps - 1)); } }
从以上可以看到pollArray是通过unsafe.allocateMemory(size + ps)分配的一块系统内存
到这里完成了Selector.open(),主要完成建立Pipe,并把pipe的wakeupSourceFd放入pollArray中,这个pollArray是Selector的枢纽。这里是以Windows的实现来看,在windows下通过两个链接的socketChannel实现了Pipe,linux下则是直接使用系统的pipe。
2. serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
AbstractSelectableChannel.java --> register() --> SelectorImpl.java ---- protected final SelectionKey register(AbstractSelectableChannel ch,int ops,Object attachment) { if (!(ch instanceof SelChImpl)) throw new IllegalSelectorException(); SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this); k.attach(attachment); synchronized (publicKeys) { implRegister(k); } k.interestOps(ops); return k; }
关键是implRegister(k);
WindowsSelectorImpl.java ---- protected void implRegister(SelectionKeyImpl ski) { growIfNeeded(); channelArray[totalChannels] = ski; ski.setIndex(totalChannels); fdMap.put(ski); keys.add(ski); pollWrapper.addEntry(totalChannels, ski); totalChannels++; } PollArrayWrapper.java ---- void addEntry(int index, SelectionKeyImpl ski) { putDescriptor(index, ski.channel.getFDVal()); }
这里把socketChannel的文件描述符放到pollArray中。
3. selector.select();
SelectorImpl.java ---- public int select(long timeout) throws IOException { if (timeout < 0) throw new IllegalArgumentException("Negative timeout"); return lockAndDoSelect((timeout == 0) ? -1 : timeout); } private int lockAndDoSelect(long timeout) throws IOException { synchronized (this) { if (!isOpen()) throw new ClosedSelectorException(); synchronized (publicKeys) { synchronized (publicSelectedKeys) { return doSelect(timeout); } } } }
其中的doSelector又回到我们的Windows实现:
WindowsSelectorImpl.java ---- protected int doSelect(long timeout) throws IOException { if (channelArray == null) throw new ClosedSelectorException(); this.timeout = timeout; // set selector timeout processDeregisterQueue(); if (interruptTriggered) { resetWakeupSocket(); return 0; } // Calculate number of helper threads needed for poll. If necessary // threads are created here and start waiting on startLock adjustThreadsCount(); finishLock.reset(); // reset finishLock // Wakeup helper threads, waiting on startLock, so they start polling. // Redundant threads will exit here after wakeup. startLock.startThreads(); // do polling in the main thread. Main thread is responsible for // first MAX_SELECTABLE_FDS entries in pollArray. try { begin(); try { subSelector.poll(); } catch (IOException e) { finishLock.setException(e); // Save this exception } // Main thread is out of poll(). Wakeup others and wait for them if (threads.size() > 0) finishLock.waitForHelperThreads(); } finally { end(); } // Done with poll(). Set wakeupSocket to nonsignaled for the next run. finishLock.checkForException(); processDeregisterQueue(); int updated = updateSelectedKeys(); // Done with poll(). Set wakeupSocket to nonsignaled for the next run. resetWakeupSocket(); return updated; } private int poll() throws IOException{ // poll for the main thread return poll0(pollWrapper.pollArrayAddress, Math.min(totalChannels, MAX_SELECTABLE_FDS), readFds, writeFds, exceptFds, timeout); }
private native int poll0(long pollAddress, int numfds, int[] readFds, int[] writeFds, int[] exceptFds, long timeout);
其他的都是一些准备工作,关键是subSelector.poll(),最后调用了native的poll0,并把pollWrapper.pollArrayAddress作为参数传给poll0,那么poll0对pollArray做了什么:
WindowsSelectorImpl.c ---- Java_sun_nio_ch_WindowsSelectorImpl_00024SubSelector_poll0(JNIEnv *env, jobject this, jlong pollAddress, jint numfds, jintArray returnReadFds, jintArray returnWriteFds, jintArray returnExceptFds, jlong timeout) { // 代码.... 此处省略一万字 /* Call select */ if ((result = select(0 , &readfds, &writefds, &exceptfds, tv)) == SOCKET_ERROR) { // 代码.... 此处省略一万字 for (i = 0; i < numfds; i++) { // 代码.... 此处省略一万字 } } }
C代码已经忘得差不多了,但这里可以看到实现思路是调用c的select方法,这里的select对应于内核中的sys_select调用,sys_select首先将第二三四个参数指向的fd_set拷贝到内核,然后对每个被SET的描述符调用进行poll,并记录在临时结果中(fdset),如果有事件发生,select会将临时结果写到用户空间并返回;当轮询一遍后没有任何事件发生时,如果指定了超时时间,则select会睡眠到超时,睡眠结束后再进行一次轮询,并将临时结果写到用户空间,然后返回。
这里的select就是轮询pollArray中的FD,看有没有事件发生,如果有事件发生收集所有发生事件的FD,退出阻塞。
关于select系统调用参考了《select、poll、epoll的比较》这篇文章,同时看到nio的select在不同平台上的实现不同,在linux上通过epoll可以不用轮询,在第一次调用后,事件信息就会与对应的epoll描述符关联起来,待的描述符上注册回调函数,当事件发生时,回调函数负责把发生的事件存储在就绪事件链表中,最后写到用户空间。
到这里已经比较清楚了,退出阻塞的方式有:regist在selector上的socketChannel处于就绪状态(放在pollArray中的socketChannel的FD就绪) 或者 第1节中放在pollArray中的wakeupSourceFd就绪。前者(socketChannel)就绪唤醒应证了文章开始的阻塞->事件驱动->唤醒的过程,后者(wakeupSourceFd)就是下面要看的主动wakeup。
4. selector.wakeup()
WindowsSelectorImpl.java ---- public Selector wakeup() { synchronized (interruptLock) { if (!interruptTriggered) { setWakeupSocket(); interruptTriggered = true; } } return this; } // Sets Windows wakeup socket to a signaled state. private void setWakeupSocket() { setWakeupSocket0(wakeupSinkFd); } private native void setWakeupSocket0(int wakeupSinkFd);
native实现摘要:
WindowsSelectorImpl.c ---- Java_sun_nio_ch_WindowsSelectorImpl_setWakeupSocket0(JNIEnv *env, jclass this, jint scoutFd) { /* Write one byte into the pipe */ send(scoutFd, (char*)&POLLIN, 1, 0); }
这里完成了向最开始建立的pipe的sink端写入了一个字节,source文件描述符就会处于就绪状态,poll方法会返回,从而导致select方法返回。(原来自己建立一个socket链着自己另外一个socket就是为了干这事)
到此通过源码分析理清了Selector阻塞唤醒原理
参考学习文章:
http://www.cnblogs.com/xuxm2007/archive/2011/08/15/2139809.html
http://blog.chinaunix.net/uid-20543672-id-3267385.html
http://goon.iteye.com/blog/1775421
http://blog.csdn.net/aesop_wubo/article/details/9117655
相关推荐
Java NIO(非阻塞I/O)是Java标准库中的一种I/O模型,与传统的BIO(阻塞I/O)模型相比,它提供了更高效、更灵活的I/O操作方式。Selector是Java NIO框架中的核心组件,它使得单个线程能够管理多个通道(Channels),...
Selector用于监听多个通道(Channel)的事件,如连接建立、数据读写等,当某个事件发生时,它会唤醒等待的线程,避免了单个线程因阻塞而浪费资源。Channel是数据传输的通道,例如SocketChannel和FileChannel,它们...
Java NIO(非阻塞I/O)中的Selector是一个核心组件,它允许单个线程处理多个通道(channels)上的I/O事件。Selector的角色就像一个交通指挥员,能够监控多个通道并决定哪个通道准备好进行读写操作,从而提高系统的...
Java NIO(非阻塞I/O)是一种在Java中处理I/O操作的高效方式,它引入了选择器(Selector)的概念,使得一个单独的线程可以监控多个输入输出通道(Channels),大大提高了并发处理能力。在这个"java基于NIO选择器...
NIO的核心在于`Selector`,它能够监视多个通道(Channel),当通道准备就绪时,Selector会唤醒阻塞在选择操作上的线程。因此,一个线程可以处理多个连接,极大地提高了服务器的并发能力。 关键类包括:`java.nio....
当`Selector`轮询这些`Channel`时,如果发现有事件发生,它就会唤醒等待的线程进行处理,这意味着只需要一个线程就可以管理多个连接,大大提升了服务器的并发处理能力。 `Socket`在NIO中的实现是`SocketChannel`,...
- **唤醒和关闭**:当有新连接或数据到达时,通道会触发选择器,唤醒阻塞在`select()`上的线程。 5. **实例代码解析** - **服务器端代码**:可能包括`ServerSocketChannel`用于监听连接,`SocketChannel`用于与每...
总结来说,Java NIO提供了一种高效、非阻塞的I/O模型,通过Channel、Selector和Buffer,可以更灵活地处理网络连接和文件I/O,尤其适合处理高并发的场景。理解并掌握这些概念,对于优化Java应用的性能和可扩展性至关...
服务器通常使用Selector来监控多个通道的状态变化,一旦有新的连接请求或数据到达,Selector会唤醒处理线程,避免了为每个连接分配单独线程的开销。这样,服务器可以同时处理成千上万的连接,提高了系统的可扩展性。...
通过对mldn nio ppt源码的学习,我们可以更深入地理解Java NIO的工作原理,掌握其核心概念,并能应用于实际的并发I/O编程中,提升程序性能。同时,源码分析也有助于我们解决实际开发中遇到的I/O问题,更好地优化系统...
- **Java NIO原理及通信模型**:NIO采用了非阻塞的方式,通过一个单独的线程来处理所有IO事件,使用事件驱动机制,当事件发生时才进行处理。它依赖于选择器(Selector)来监控多个通道(Channel)上的事件,如连接...
而在NIO中,使用Selector可以实现非阻塞I/O,线程不再需要等待,而是能够处理其他任务,当数据准备好时,Selector会通知线程进行处理。 6. **内存映射文件**: `MappedByteBuffer`是NIO的一个重要特性,它允许直接...
Selector注册了感兴趣的事件(如连接就绪、数据可读或可写),然后在事件发生时唤醒,这样就可以高效地处理多个通道。例如,一个服务器可能需要同时处理多个客户端的连接请求和数据传输,Selector可以帮助我们实现这...
这种服务器利用了Java的NIO库,包括Selector、Channel和Buffer等核心组件,以非阻塞的方式管理网络通信,从而提高系统的并行处理能力。 NIO(New Input/Output)是Java从1.4版本开始引入的一个新特性,与传统的BIO...
而NIO引入了非阻塞I/O,允许程序在不能立即获得数据时不会被挂起,可以继续执行其他任务,提高了系统在处理多个连接请求时的效率。 Java NIO服务器设计通常涉及到以下几个核心概念: 1. **通道(Channels)**:...
为了能够在某些情况下中断或提前结束`Selector`的阻塞状态,Java NIO 提供了`wakeup()`方法。当调用`wakeup()`方法时,会触发一个虚拟的I/O事件,使得`Selector`立即从阻塞状态中返回。 **具体实现原理如下:** - ...
NIO通过选择器(Selector)监控多个通道,当通道有事件发生时,选择器唤醒,然后处理事件,避免了线程的过度消耗。NIO适用于高并发、低延迟的网络应用。 **4. Netty** Netty是一个高性能、异步事件驱动的网络应用...
xSocket通过使用NIO的多路复用器(Selector)来监听多个套接字通道,当有数据可读或可写时,Selector会唤醒线程进行相应的处理,而不是一直阻塞等待。这种方式使得xSocket可以在一个线程中管理大量的连接,减少了...
Java NIO(New IO)是Java 1.4版本引入的一种新的I/O API,它提供了非阻塞I/O操作的能力,极大地提升了Java在...通过分析和学习这个源码,开发者可以深入理解Java NIO的工作原理,并将其应用于实际的网络编程项目中。
在本项目"基于NIO的Java聊天室2"中,我们深入探讨了如何利用Java的非阻塞I/O(New Input/Output,NIO)框架来实现一个多人在线聊天室。NIO是一种高效的I/O模型,它允许程序在不阻塞主线程的情况下处理输入和输出操作...