`

Java NIO——Selector机制解析三(源码分析)

 
阅读更多

 最近一直在看java nio,对其中的selector比较感兴趣,所有就先在网上查了些资料,发现还真有很多人研究过这个,其中尤以皓哥写的比较有意思,也很使我受启发,我也转了他的博客Java NIO——Selector机制解析《转》,但是我一直不明白pipe是如何唤醒selector的,所以又去看了jdk的源码(openjdk下载),整理了如下:

Java nio自带demo : OperationServer.java   OperationClient.java(见附件)

其中server端的核心代码:

Java代码 复制代码 收藏代码
  1. public void initSelector() {   
  2.         try {   
  3.             selector = SelectorProvider.provider().openSelector();   
  4.             this.serverChannel1 = ServerSocketChannel.open();   
  5.             serverChannel1.configureBlocking(false);   
  6.             InetSocketAddress isa = new InetSocketAddress("localhost"this.port1);   
  7.             serverChannel1.socket().bind(isa);   
  8.             serverChannel1.register(selector, SelectionKey.OP_ACCEPT);   
  9.         } catch (IOException e) {   
  10.             // TODO Auto-generated catch block   
  11.             e.printStackTrace();   
  12.         }   
  13. }  
public void initSelector() {
        try {
            selector = SelectorProvider.provider().openSelector();
            this.serverChannel1 = ServerSocketChannel.open();
            serverChannel1.configureBlocking(false);
            InetSocketAddress isa = new InetSocketAddress("localhost", this.port1);
            serverChannel1.socket().bind(isa);
            serverChannel1.register(selector, SelectionKey.OP_ACCEPT);
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
}

 

从头开始,

先看看SelectorProvider.provider()做了什么:

 

 

 

Java代码 复制代码 收藏代码
  1. public static SelectorProvider provider() {   
  2.         synchronized (lock) {   
  3.             if (provider != null)   
  4.                 return provider;   
  5.             return AccessController.doPrivileged(   
  6.                 new PrivilegedAction<SelectorProvider>() {   
  7.                     public SelectorProvider run() {   
  8.                             if (loadProviderFromProperty())   
  9.                                 return provider;   
  10.                             if (loadProviderAsService())   
  11.                                 return provider;   
  12.                             provider = sun.nio.ch.DefaultSelectorProvider.create();   
  13.                             return provider;   
  14.                         }   
  15.                     });   
  16.         }   
  17.     }  
public static SelectorProvider provider() {
        synchronized (lock) {
            if (provider != null)
                return provider;
            return AccessController.doPrivileged(
                new PrivilegedAction<SelectorProvider>() {
                    public SelectorProvider run() {
                            if (loadProviderFromProperty())
                                return provider;
                            if (loadProviderAsService())
                                return provider;
                            provider = sun.nio.ch.DefaultSelectorProvider.create();
                            return provider;
                        }
                    });
        }
    }

 

其中provider = sun.nio.ch.DefaultSelectorProvider.create();会根据操作系统来返回不同的实现类,windows平台就返回WindowsSelectorProvider;而if (provider != null)

                returnprovider;

保证了整个server程序中只有一个WindowsSelectorProvider对象;

再看看WindowsSelectorProvider. openSelector():

Java代码 复制代码 收藏代码
  1. public AbstractSelector openSelector() throws IOException {   
  2.         return new WindowsSelectorImpl(this);   
  3.     }   
  4. new WindowsSelectorImpl(SelectorProvider)代码:   
  5. WindowsSelectorImpl(SelectorProvider sp) throws IOException {   
  6.         super(sp);   
  7.         pollWrapper = new PollArrayWrapper(INIT_CAP);   
  8.         wakeupPipe = Pipe.open();   
  9.         wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal();   
  10.   
  11.         // Disable the Nagle algorithm so that the wakeup is more immediate   
  12.         SinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink();   
  13.         (sink.sc).socket().setTcpNoDelay(true);   
  14.         wakeupSinkFd = ((SelChImpl)sink).getFDVal();   
  15.   
  16.         pollWrapper.addWakeupSocket(wakeupSourceFd, 0);   
  17.     }  
public AbstractSelector openSelector() throws IOException {
        return new WindowsSelectorImpl(this);
    }
new WindowsSelectorImpl(SelectorProvider)代码:
WindowsSelectorImpl(SelectorProvider sp) throws IOException {
        super(sp);
        pollWrapper = new PollArrayWrapper(INIT_CAP);
        wakeupPipe = Pipe.open();
        wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal();

        // Disable the Nagle algorithm so that the wakeup is more immediate
        SinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink();
        (sink.sc).socket().setTcpNoDelay(true);
        wakeupSinkFd = ((SelChImpl)sink).getFDVal();

        pollWrapper.addWakeupSocket(wakeupSourceFd, 0);
    }

 

其中Pipe.open()是关键,这个方法的调用过程是:

Java代码 复制代码 收藏代码
  1. public static Pipe open() throws IOException {   
  2.         return SelectorProvider.provider().openPipe();   
  3. }   
  4. SelectorProvider 中:   
  5. public Pipe openPipe() throws IOException {   
  6.         return new PipeImpl(this);   
  7. }  
public static Pipe open() throws IOException {
        return SelectorProvider.provider().openPipe();
}
SelectorProvider 中:
public Pipe openPipe() throws IOException {
        return new PipeImpl(this);
}

 

再看看怎么new PipeImpl()的:

Java代码 复制代码 收藏代码
  1. PipeImpl(SelectorProvider sp) {   
  2.         long pipeFds = IOUtil.makePipe(true);   
  3.         int readFd = (int) (pipeFds >>> 32);   
  4.         int writeFd = (int) pipeFds;   
  5.         FileDescriptor sourcefd = new FileDescriptor();   
  6.         IOUtil.setfdVal(sourcefd, readFd);   
  7.         source = new SourceChannelImpl(sp, sourcefd);   
  8.         FileDescriptor sinkfd = new FileDescriptor();   
  9.         IOUtil.setfdVal(sinkfd, writeFd);   
  10.         sink = new SinkChannelImpl(sp, sinkfd);   
  11.  }  
PipeImpl(SelectorProvider sp) {
        long pipeFds = IOUtil.makePipe(true);
        int readFd = (int) (pipeFds >>> 32);
        int writeFd = (int) pipeFds;
        FileDescriptor sourcefd = new FileDescriptor();
        IOUtil.setfdVal(sourcefd, readFd);
        source = new SourceChannelImpl(sp, sourcefd);
        FileDescriptor sinkfd = new FileDescriptor();
        IOUtil.setfdVal(sinkfd, writeFd);
        sink = new SinkChannelImpl(sp, sinkfd);
 }

 

其中IOUtil.makePipe(true)是个native方法:

/**

     * Returns two file descriptors for a pipe encoded in a long.

     * The read end of the pipe is returned in the high 32 bits,

     * while the write end is returned in the low 32 bits.

     */

staticnativelong makePipe(boolean blocking);

具体实现:

 

 

 

Java代码 复制代码 收藏代码
  1. JNIEXPORT jlong JNICALL   
  2. Java_sun_nio_ch_IOUtil_makePipe(JNIEnv *env, jobject this, jboolean blocking)   
  3. {   
  4.     int fd[2];   
  5.   
  6.     if (pipe(fd) < 0) {   
  7.         JNU_ThrowIOExceptionWithLastError(env, "Pipe failed");   
  8.         return 0;   
  9.     }   
  10.     if (blocking == JNI_FALSE) {   
  11.         if ((configureBlocking(fd[0], JNI_FALSE) < 0)   
  12.             || (configureBlocking(fd[1], JNI_FALSE) < 0)) {   
  13.             JNU_ThrowIOExceptionWithLastError(env, "Configure blocking failed");   
  14.             close(fd[0]);   
  15.             close(fd[1]);   
  16.             return 0;   
  17.         }   
  18.     }   
  19.     return ((jlong) fd[0] << 32) | (jlong) fd[1];   
  20. }   
  21. static int  
  22. configureBlocking(int fd, jboolean blocking)   
  23. {   
  24.     int flags = fcntl(fd, F_GETFL);   
  25.     int newflags = blocking ? (flags & ~O_NONBLOCK) : (flags | O_NONBLOCK);   
  26.   
  27.     return (flags == newflags) ? 0 : fcntl(fd, F_SETFL, newflags);   
  28. }  
JNIEXPORT jlong JNICALL
Java_sun_nio_ch_IOUtil_makePipe(JNIEnv *env, jobject this, jboolean blocking)
{
    int fd[2];

    if (pipe(fd) < 0) {
        JNU_ThrowIOExceptionWithLastError(env, "Pipe failed");
        return 0;
    }
    if (blocking == JNI_FALSE) {
        if ((configureBlocking(fd[0], JNI_FALSE) < 0)
            || (configureBlocking(fd[1], JNI_FALSE) < 0)) {
            JNU_ThrowIOExceptionWithLastError(env, "Configure blocking failed");
            close(fd[0]);
            close(fd[1]);
            return 0;
        }
    }
    return ((jlong) fd[0] << 32) | (jlong) fd[1];
}
static int
configureBlocking(int fd, jboolean blocking)
{
    int flags = fcntl(fd, F_GETFL);
    int newflags = blocking ? (flags & ~O_NONBLOCK) : (flags | O_NONBLOCK);

    return (flags == newflags) ? 0 : fcntl(fd, F_SETFL, newflags);
}

 

正如这段注释:

/**

     * Returns two file descriptors for a pipe encoded in a long.

     * The read end of the pipe is returned in the high 32 bits,

     * while the write end is returned in the low 32 bits.

     */

High32位存放的是通道read端的文件描述符FDfile descriptor),low 32 bits存放的是write端的文件描述符。所以取到makepipe()返回值后要做移位处理。

 

pollWrapper.addWakeupSocket(wakeupSourceFd, 0);

这行代码把返回的pipewrite端的FD放在了pollWrapper中(后面会发现,这么做是为了实现selectorwakeup()

 

ServerSocketChannel.open()的实现:

Java代码 复制代码 收藏代码
  1. public static ServerSocketChannel open() throws IOException {   
  2.         return SelectorProvider.provider().openServerSocketChannel();   
  3. }   
  4. SelectorProvider:   
  5. public ServerSocketChannel openServerSocketChannel() throws IOException {   
  6.         return new ServerSocketChannelImpl(this);   
  7. }  
public static ServerSocketChannel open() throws IOException {
        return SelectorProvider.provider().openServerSocketChannel();
}
SelectorProvider:
public ServerSocketChannel openServerSocketChannel() throws IOException {
        return new ServerSocketChannelImpl(this);
}

 

可见创建的ServerSocketChannelImpl也有WindowsSelectorImpl的引用。

 

Java代码 复制代码 收藏代码
  1. ServerSocketChannelImpl(SelectorProvider sp) throws IOException {   
  2.         super(sp);   
  3.         this.fd =  Net.serverSocket(true);  //打开一个socket,返回FD   
  4.         this.fdVal = IOUtil.fdVal(fd);   
  5.         this.state = ST_INUSE;   
  6. }  
ServerSocketChannelImpl(SelectorProvider sp) throws IOException {
        super(sp);
        this.fd =  Net.serverSocket(true);	//打开一个socket,返回FD
        this.fdVal = IOUtil.fdVal(fd);
        this.state = ST_INUSE;
}

 

然后通过serverChannel1.register(selector, SelectionKey.OP_ACCEPT);selectorchannel绑定在一起,也就是把new ServerSocketChannel时创建的FDselector绑定在了一起。

到此,server端已启动完成了,主要创建了以下对象:

WindowsSelectorProvider单例

WindowsSelectorImpl中包含:

    pollWrapper:保存selector上注册的FD,包括pipewriteFDServerSocketChannel所用的FD

    wakeupPipe通道(其实就是两个FD,一个read,一个write

 

再到Server 中的run():

selector.select();主要调用了WindowsSelectorImpl中的这个方法:

 

 

 

Java代码 复制代码 收藏代码
  1. protected int doSelect(long timeout) throws IOException {   
  2.         if (channelArray == null)   
  3.             throw new ClosedSelectorException();   
  4.         this.timeout = timeout; // set selector timeout   
  5.         processDeregisterQueue();   
  6.         if (interruptTriggered) {   
  7.             resetWakeupSocket();   
  8.             return 0;   
  9.         }   
  10.         // Calculate number of helper threads needed for poll. If necessary   
  11.         // threads are created here and start waiting on startLock   
  12.         adjustThreadsCount();   
  13.         finishLock.reset(); // reset finishLock   
  14.         // Wakeup helper threads, waiting on startLock, so they start polling.   
  15.         // Redundant threads will exit here after wakeup.   
  16.         startLock.startThreads();   
  17.         // do polling in the main thread. Main thread is responsible for   
  18.         // first MAX_SELECTABLE_FDS entries in pollArray.   
  19.         try {   
  20.             begin();   
  21.             try {   
  22.                 subSelector.poll();   
  23.             } catch (IOException e) {   
  24.                 finishLock.setException(e); // Save this exception   
  25.             }   
  26.             // Main thread is out of poll(). Wakeup others and wait for them   
  27.             if (threads.size() > 0)   
  28.                 finishLock.waitForHelperThreads();   
  29.           } finally {   
  30.               end();   
  31.           }   
  32.         // Done with poll(). Set wakeupSocket to nonsignaled  for the next run.   
  33.         finishLock.checkForException();   
  34.         processDeregisterQueue();   
  35.         int updated = updateSelectedKeys();   
  36.         // Done with poll(). Set wakeupSocket to nonsignaled  for the next run.   
  37.         resetWakeupSocket();   
  38.         return updated;   
  39.     }  
protected int doSelect(long timeout) throws IOException {
        if (channelArray == null)
            throw new ClosedSelectorException();
        this.timeout = timeout; // set selector timeout
        processDeregisterQueue();
        if (interruptTriggered) {
            resetWakeupSocket();
            return 0;
        }
        // Calculate number of helper threads needed for poll. If necessary
        // threads are created here and start waiting on startLock
        adjustThreadsCount();
        finishLock.reset(); // reset finishLock
        // Wakeup helper threads, waiting on startLock, so they start polling.
        // Redundant threads will exit here after wakeup.
        startLock.startThreads();
        // do polling in the main thread. Main thread is responsible for
        // first MAX_SELECTABLE_FDS entries in pollArray.
        try {
            begin();
            try {
                subSelector.poll();
            } catch (IOException e) {
                finishLock.setException(e); // Save this exception
            }
            // Main thread is out of poll(). Wakeup others and wait for them
            if (threads.size() > 0)
                finishLock.waitForHelperThreads();
          } finally {
              end();
          }
        // Done with poll(). Set wakeupSocket to nonsignaled  for the next run.
        finishLock.checkForException();
        processDeregisterQueue();
        int updated = updateSelectedKeys();
        // Done with poll(). Set wakeupSocket to nonsignaled  for the next run.
        resetWakeupSocket();
        return updated;
    }

 其中subSelector.poll()是核心,也就是轮训pollWrapper保存的FD;具体实现是调用native方法poll0

Java代码 复制代码 收藏代码
  1. private int poll() throws IOException{ // poll for the main thread   
  2.             return poll0(pollWrapper.pollArrayAddress,   
  3.                          Math.min(totalChannels, MAX_SELECTABLE_FDS),   
  4.                          readFds, writeFds, exceptFds, timeout);   
  5.         }   
  6. private native int poll0(long pollAddress, int numfds,   
  7.              int[] readFds, int[] writeFds, int[] exceptFds, long timeout);   
  8. // These arrays will hold result of native select().   
  9.             // The first element of each array is the number of selected sockets.   
  10.         // Other elements are file descriptors of selected sockets.   
  11.         private final int[] readFds = new int [MAX_SELECTABLE_FDS + 1];//保存发生read的FD   
  12.         private final int[] writeFds = new int [MAX_SELECTABLE_FDS + 1]; //保存发生write的FD   
  13.         private final int[] exceptFds = new int [MAX_SELECTABLE_FDS + 1]; //保存发生except的FD  
private int poll() throws IOException{ // poll for the main thread
            return poll0(pollWrapper.pollArrayAddress,
                         Math.min(totalChannels, MAX_SELECTABLE_FDS),
                         readFds, writeFds, exceptFds, timeout);
        }
private native int poll0(long pollAddress, int numfds,
             int[] readFds, int[] writeFds, int[] exceptFds, long timeout);
// These arrays will hold result of native select().
    		// The first element of each array is the number of selected sockets.
        // Other elements are file descriptors of selected sockets.
        private final int[] readFds = new int [MAX_SELECTABLE_FDS + 1];//保存发生read的FD
        private final int[] writeFds = new int [MAX_SELECTABLE_FDS + 1]; //保存发生write的FD
        private final int[] exceptFds = new int [MAX_SELECTABLE_FDS + 1]; //保存发生except的FD

 

 

这个poll0()会监听pollWrapper中的FD有没有数据进出,这会造成IO阻塞,直到有数据读写事件发生。比如,由于pollWrapper中保存的也有ServerSocketChannelFD,所以只要ClientSocket发一份数据到ServerSocket,那么poll0()就会返回;又由于pollWrapper中保存的也有pipewrite端的FD,所以只要pipewrite端向FD发一份数据,也会造成poll0()返回;如果这两种情况都没有发生,那么poll0()就一直阻塞,也就是selector.select()会一直阻塞;如果有任何一种情况发生,那么selector.select()就会返回,所有在OperationServerrun()里要用while (true) {,这样就可以保证在selector接收到数据并处理完后继续监听poll();

这时再来看看WindowsSelectorImpl. Wakeup():

 

 

Java代码 复制代码 收藏代码
  1. public Selector wakeup() {   
  2.         synchronized (interruptLock) {   
  3.             if (!interruptTriggered) {   
  4.                 setWakeupSocket();   
  5.                 interruptTriggered = true;   
  6.             }   
  7.         }   
  8.         return this;   
  9.     }   
  10. // Sets Windows wakeup socket to a signaled state.   
  11.     private void setWakeupSocket() {   
  12.         setWakeupSocket0(wakeupSinkFd);   
  13.     }   
  14. private native void setWakeupSocket0(int wakeupSinkFd);   
  15. JNIEXPORT void JNICALL   
  16. Java_sun_nio_ch_WindowsSelectorImpl_setWakeupSocket0(JNIEnv *env, jclass this,   
  17.                                                 jint scoutFd)   
  18. {   
  19.     /* Write one byte into the pipe */  
  20.     const char byte = 1;   
  21.     send(scoutFd, &byte10);   
  22. }  
public Selector wakeup() {
        synchronized (interruptLock) {
            if (!interruptTriggered) {
                setWakeupSocket();
                interruptTriggered = true;
            }
        }
        return this;
    }
// Sets Windows wakeup socket to a signaled state.
    private void setWakeupSocket() {
        setWakeupSocket0(wakeupSinkFd);
    }
private native void setWakeupSocket0(int wakeupSinkFd);
JNIEXPORT void JNICALL
Java_sun_nio_ch_WindowsSelectorImpl_setWakeupSocket0(JNIEnv *env, jclass this,
                                                jint scoutFd)
{
    /* Write one byte into the pipe */
    const char byte = 1;
    send(scoutFd, &byte, 1, 0);
}

 

可见wakeup()是通过pipewrite send(scoutFd, &byte, 1, 0),发生一个字节1,来唤醒poll()。所以在需要的时候就可以调用selector.wakeup()来唤醒selector

源于:http://goon.iteye.com/blog/1775421

分享到:
评论

相关推荐

    Java_NIO类库Selector机制解析.doc

    Java_NIO类库Selector机制解析.docJava_NIO类库Selector机制解析.docJava_NIO类库Selector机制解析.docJava_NIO类库Selector机制解析.doc

    JavaNIO库Selector机制解析.docx

    JavaNIO库Selector机制解析.docx

    Java NIO——Java NIO

    Java NIO——Java NIO——Java NIO

    Java-NIO类库Selector机制解析.docx

    "Java NIO Selector 机制解析" Java NIO(New I/O)类库是Java 1.4版本以后引入的新一代I/O机制,相比传统的I/O机制,NIO提供了高效、异步、多路复用的I/O操作模式。Selector机制是NIO类库中的一种核心机制,用于...

    Java_NIO-Selector.rar_java nio_selector

    下面我们将详细探讨Java NIO中的Selector机制。 1. **Selector的作用** Selector的主要功能是监控多个通道的状态变化,例如连接建立、数据到达或者关闭等事件。通过注册感兴趣的事件类型到Selector,程序可以在一个...

    Java Nio selector例程

    java侧起server(NioUdpServer1.java),基于Java Nio的selector 阻塞等候,一个android app(NioUdpClient1文件夹)和一个java程序(UI.java)作为两个client分别向该server发数据,server收到后分别打印收到的消息...

    java基于NIO实现Reactor模型源码.zip

    java基于NIO实现Reactor模型源码java基于NIO实现Reactor模型源码java基于NIO实现Reactor模型源码java基于NIO实现Reactor模型源码java基于NIO实现Reactor模型源码java基于NIO实现Reactor模型源码java基于NIO实现...

    java nio Selector的使用-客户端

    Selector是Java NIO中的核心组件之一,它允许单个线程处理多个通道(channels)的读写事件,极大地提高了服务器的并发能力。本篇文章将深入探讨如何在Java NIO中使用Selector处理客户端的I/O请求。 首先,我们需要...

    JavaNIO chm帮助文档

    Java NIO系列教程(三) Buffer Java NIO系列教程(四) Scatter/Gather Java NIO系列教程(五) 通道之间的数据传输 Java NIO系列教程(六) Selector Java NIO系列教程(七) FileChannel Java NIO系列教程(八) ...

    java NIO.zip

    Java NIO,全称为Non-...总的来说,Java NIO提供了比传统I/O更灵活、更高效的数据传输机制,尤其适用于需要处理大量并发连接的网络应用,如服务器端的开发。通过合理利用NIO的特性,可以构建出高性能、低延迟的系统。

    Java NIO实战开发多人聊天室

    14-Java NIO-Buffer-三个属性和类型.mp4 17-Java NIO-Buffer-缓冲区分片.mp4 18-Java NIO-Buffer-只读缓冲区.mp4 19-Java NIO-Buffer-直接缓冲区.mp4 21-Java NIO-Selector-概述.mp4 23-Java NIO-Selector-示例代码...

    java NIO和java并发编程的书籍

    java NIO和java并发编程的书籍java NIO和java并发编程的书籍java NIO和java并发编程的书籍java NIO和java并发编程的书籍java NIO和java并发编程的书籍java NIO和java并发编程的书籍java NIO和java并发编程的书籍java...

    Java NIO英文高清原版

    Java NIO,全称为Non-Blocking Input/Output(非阻塞输入/输出),是Java平台中用于替代标准I/O(BIO)模型的一种新机制。NIO在Java 1.4版本引入,提供了更高效的数据处理和通道通信方式,特别适用于高并发、大数据...

    Java NIO详解及源码下载

    Java NIO(New IO)是Java 1.4版本引入的一个新模块,全称为Non-blocking Input/Output,它提供了一种不同于传统IO的编程模型,传统IO基于块I/O(Blocking I/O),而NIO则基于通道(Channels)和缓冲区(Buffers)...

    Java nio源码

    Java NIO,全称为New Input/Output,是Java在1.4版本引入的一个新特性,旨在提供一种更高效、更具选择性的I/O模型。...通过分析NIO源码,我们可以深入了解其内部工作原理,进一步优化和调试相关代码。

    Java NIO原理解析

    总结来说,Java NIO通过非阻塞I/O和选择器机制,提供了更高效、灵活的I/O处理能力,降低了系统资源消耗,尤其适合于处理高并发的网络通信和文件操作。随着Java版本的更新,NIO的功能也在不断完善,如NIO 2引入了异步...

    java NIO技巧及原理

    Java NIO(New Input/Output)是Java标准库提供的一种I/O模型,它与传统的 Blocking I/O(IO)相比,提供了更加高效的数据传输方式。在Java NIO中,"新"主要体现在非阻塞和多路复用这两个特性上,这使得NIO更适合于...

    java基于NIO选择器Selector的多人聊天室

    总的来说,这个项目展示了如何使用Java NIO的`Selector`机制来实现一个高并发的聊天应用,同时还包含了图形用户界面的设计,使得用户能够直观地参与聊天。通过学习和理解这个项目,开发者可以深入掌握Java NIO的使用...

Global site tag (gtag.js) - Google Analytics