`
Donald_Draper
  • 浏览: 986466 次
社区版块
存档分类
最新评论

WindowsSelectorImpl解析二(选择操作,通道注册,通道反注册,选择器关闭等)

    博客分类:
  • NIO
nio 
阅读更多
WindowsSelectorImpl解析一(FdMap,PollArrayWrapper):http://donald-draper.iteye.com/blog/2370811
引言:
    上一篇文章我们简单看了一下的WindowsSelectorImpl内部集合和变量及同步锁的定义,先来回顾下,
   WindowsSelectorImpl默认加载net和nio资源库;WindowsSelectorImpl内锁4个,分别为关闭锁closeLock,中断锁interruptLock,startLock,finishLock后面两个的作用,目前还不清楚,后面再说;一个唤醒管道,作用尚不明确;一个注册到选择器的通道计数器totalChannels;updateCount计数器作用,尚不明确;通道集合channelArray,存放的元素实际为通道关联的选择key;
    pollWrapper用于存储选择key和相应的兴趣事件,及唤醒管道的源通道,唤醒管道的源通道存放在pollWrapper的索引0位置上。
    FdMap主要是存储选择key的,FdMap实际上是一个HashMap,key为选择key的文件描述id,value为MapEntry,MapEntry为选择key的包装Entry,里面含有更新计数器updateCount和清除计数器clearedCount。
    PollArrayWrapper存放选择key和通道及其相关的操作事件。PollArrayWrapper通过AllocatedNativeObject来存储先关的文件描述及其兴趣事件,AllocatedNativeObject
为已分配的底层内存空间,AllocatedNativeObject的内存主要NativeObject来分配,
NativeObject实际是通过Unsafe来分配内存。PollArrayWrapper作用即存放选择key和选择key关注的事件,用选择key的文件描述id,表示选择key,文件描述id为int,所以占4个字节,选择key的兴趣操作事件也为int,即4个字节,所以SIZE_POLLFD为8,文件描述id开始位置FD_OFFSET为0,兴趣事件开始位置EVENT_OFFSET为4;FD_OFFSET和EVENT_OFFSET都是相对于SIZE_POLLFD的。PollArrayWrapper同时存储唤醒等待选择操作的选择器的通道和唤醒通道关注事件,即通道注册选择器事件,即添加选择key事件。当有通道注册到选择器,则唤醒通道,唤醒等待选择操作的选择器。
    这里我们把WindowsSelectorImpl的内部结构和构造代码贴出了,以便我们进一步跟进上一篇文章遗留的问题。
final class WindowsSelectorImpl extends SelectorImpl
{
    private final int INIT_CAP = 8;//选择key集合,key包装集合初始化容量
    private static final int MAX_SELECTABLE_FDS = 1024;//最大选择key数量
    private SelectionKeyImpl channelArray[];//选择器关联通道集合
    private PollArrayWrapper pollWrapper;//存放所有文件描述对象(选择key,唤醒管道的源与sink通道)的集合
    private int totalChannels;//注册到选择的通道数量
    private int threadsCount;//选择线程数
    private final List threads = new ArrayList();//选择操作线程集合
    private final Pipe wakeupPipe = Pipe.open();//唤醒等待选择操操的管道
    private final int wakeupSourceFd;//唤醒管道源通道文件描述
    private final int wakeupSinkFd;//唤醒管道sink通道文件描述
    private Object closeLock;//选择器关闭同步锁
    private final FdMap fdMap = new FdMap();//存放选择key文件描述与选择key映射关系的Map
    //子选择器,主要从pollWrapper拉取读写选择key,并更新key通道的就绪操作事件集
    private final SubSelector subSelector = new SubSelector();
    private long timeout;//超时时间,从pollWrapper拉取文件描述的超时时间
    private final Object interruptLock = new Object();//中断同步锁,在唤醒选择操作线程时,用于同步
    private volatile boolean interruptTriggered;//是否唤醒等待选择操的线程
    private final StartLock startLock = new StartLock();//选择操作开始锁
    private final FinishLock finishLock = new FinishLock();//选择操作结束锁
    private long updateCount;//已选择key,更新就绪操作事件计数器
    static final boolean $assertionsDisabled = !sun/nio/ch/WindowsSelectorImpl.desiredAssertionStatus();
    static 
    {
        //加载nio,net资源库
        Util.load();
    } 
}

WindowsSelectorImpl(SelectorProvider selectorprovider)
        throws IOException
    {
        super(selectorprovider);
	//创建选择器关联通道数组,实际存的为选择key
        channelArray = new SelectionKeyImpl[8];
        totalChannels = 1;//通道计数器
        threadsCount = 0;//线程计数器
        closeLock = new Object();//关闭锁
        interruptTriggered = false;
        updateCount = 0L;//更新计数器
        pollWrapper = new PollArrayWrapper(8);
        wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal();//唤醒管道源通道文件描述id
        SinkChannelImpl sinkchannelimpl = (SinkChannelImpl)wakeupPipe.sink();//唤醒管道sink通道
        sinkchannelimpl.sc.socket().setTcpNoDelay(true);//设置唤醒管道sink通道的Socket为无延时
        wakeupSinkFd = sinkchannelimpl.getFDVal();
	//将唤醒管道的源通道文件描述id添加pollWrapper的索引0位置上
        pollWrapper.addWakeupSocket(wakeupSourceFd, 0);
    }

WindowsSelectorImpl默认加载net和nio资源库;WindowsSelectorImpl内锁4个,分别为关闭锁closeLock,中断锁interruptLock,startLock,finishLock后面两个的作用,目前还不清楚,后面再说;一个唤醒管道,作用尚不明确;一个注册到选择器的通道计数器totalChannels;updateCount计数器作用,尚不明确;通道集合channelArray,存放的元素实际为通道关联的选择key;pollWrapper用于存储选择key和相应的兴趣事件,及唤醒管道的源通道,唤醒管道的源通道存放在pollWrapper的索引0位置上。
我们要关注的几个方法为
1.注册key操作的implRegister方法
2.处理取消key集合方法中implDereg方法
3.选择操作中的doSelect(long l)
4.唤醒方法wakeup
5.实际关闭选择通道方法implClose
今天我们通过这几个方法,来理解interruptLock,startLock,finishLock,totalChannels,
threadsCount,updateCount,wakeupPipe(wakeupSinkFd,wakeupSourceFd),
subSelector等作用。
下面先看
1.注册key操作的implRegister方法
protected void implRegister(SelectionKeyImpl selectionkeyimpl)
{
    //同步关闭锁,以防在注册的过程中,选择器被关闭
    synchronized(closeLock)
    {
        if(pollWrapper == null)
	        //文件描述包装集合为null,即选器已关闭
            throw new ClosedSelectorException();
        growIfNeeded();//
        channelArray[totalChannels] = selectionkeyimpl;//添加到选择器通道集合
        selectionkeyimpl.setIndex(totalChannels);//设置key在选择器通道集合的索引
        fdMap.put(selectionkeyimpl);//添加选择key到文件描述fdMap
        keys.add(selectionkeyimpl);//添加key到key集合
	 //将选择key添加到文件描述信息及关注操作事件包装集合pollWrapper
        pollWrapper.addEntry(totalChannels, selectionkeyimpl);
        totalChannels++;//通道计数器自增
    }
}

这个方法中我们需要关注的是这一句:
growIfNeeded();

 
private void growIfNeeded()
 {
     //如果选择器通道集合已满
     if(channelArray.length == totalChannels)
     {
         //扩容选择器通道集合的容量为原来的两倍
         int i = totalChannels * 2;
	 //创建新的选择器通道集合,并将原始通道集合元素,拷贝到新集合中
         SelectionKeyImpl aselectionkeyimpl[] = new SelectionKeyImpl[i];
         System.arraycopy(channelArray, 1, aselectionkeyimpl, 1, totalChannels - 1);
         channelArray = aselectionkeyimpl;
	 //扩容文件描述信息及关注操作事件包装集合pollWrapper
         pollWrapper.grow(i);
     }
     //如果通道数量为1024的整数倍
     if(totalChannels % 1024 == 0)
     {
         //添加唤醒源通道到pollWrapper的索引totalChannels
         pollWrapper.addWakeupSocket(wakeupSourceFd, totalChannels);
         totalChannels++;//通道计数器自增
         threadsCount++;//线程数自增
     }
 }

到这里可以看到通道计数器totalChannels,是pollWrapper中文件描述符的数量,在WindowsSelectorImpl的构造中totalChannels被初始化为1,而不是0,因为在构造是已经将wakeupSourceFd添加到pollWrapper,wakeupSourceFd在pollWrapper为位置假设为i,那么通道集合相应位置上的元素实际上为null。
从implRegister的方法,可以看出,首先同步关闭锁,以防在注册的过程中,选择器被关闭;
检查选择器是否关闭,没有关闭,则检查是否扩容,需要则扩容为pollWrapper为原来的两倍;检查过后,添加选择key到选择器通道集合,设置key在选择器通道集合的索引,添加选择key到文件描述fdMap,添加key到key集合,将选择key添加到文件描述信息及关注操作事件包装集合pollWrapper,通道计数器自增;

再来看实际的反注册key方法
2.处理取消key集合方法中implDereg方法
protected void implDereg(SelectionKeyImpl selectionkeyimpl)
        throws IOException
    {
        //获取选择key在选择器通道集合中的索引
        int i = selectionkeyimpl.getIndex();
	//断言开启,且断言失败,i小于0,抛出断言异常
        if(!$assertionsDisabled && i < 0)
            throw new AssertionError();
        if(i != totalChannels - 1)
        {
	    //如果反注册的key不在通道集合的尾部,则与尾部交换,更新key的通道集合索引
            SelectionKeyImpl selectionkeyimpl1 = channelArray[totalChannels - 1];
            channelArray[i] = selectionkeyimpl1;
            selectionkeyimpl1.setIndex(i);
	    //并将交换信息同步到pollWrapper
            pollWrapper.replaceEntry(pollWrapper, totalChannels - 1, pollWrapper, i);
        }
	//置空通道集合totalChannels - 1索引上的元素
        channelArray[totalChannels - 1] = null;
        totalChannels--;//通道计数器自减
        selectionkeyimpl.setIndex(-1);//设置反注册key的索引为-1,已无效
        if(totalChannels != 1 && totalChannels % 1024 == 1)
        {
	    //如果反注册后,通道计数器不为1,且当前通道为每批次(1024)的第一个通道,
	    //则通道计数器自减,线程计数器自减
            totalChannels--;
            threadsCount--;
        }
	//从fdMap,keys,selectedKeys集合移除选择key
        fdMap.remove(selectionkeyimpl);
        keys.remove(selectionkeyimpl);
        selectedKeys.remove(selectionkeyimpl);
	//将key从通道中移除
        deregister(selectionkeyimpl);
        SelectableChannel selectablechannel = selectionkeyimpl.channel();
        if(!selectablechannel.isOpen() && !selectablechannel.isRegistered())
	   //如果通道关闭,且为注册,则kill
            ((SelChImpl)selectablechannel).kill();
    }

从implDereg的方法,可以看出点线程计数器threadsCount的意思,threadCount记录的是
pollWrapper中wakeupSourceFd的数量,pollWrapper的文件描述是分批次的,每批次有1024文件描述,这个批次的文件描述的第一个为wakeupSourceFd。implDereg方法首选判断反注册的key是不是在通道key尾部,不在交换,并将交换信息更新到pollWrapper,
从fdMap,keys,selectedKeys集合移除选择key,并将key从通道中移除。
3.选择操作中的doSelect(long l)
在看这个方法之前,因为涉及到StartLock,FinishLock,SelectThread和SubSelector,先看一下这个类的定义;
private final class SubSelector
 {
     private final int pollArrayIndex;//pollWrapper索引
     private final int readFds[];//读操作文件描述符集
     private final int writeFds[];//写操作文件描述符集
     private final int exceptFds[];
     final WindowsSelectorImpl this$0;
     private SubSelector()
     {
         this$0 = WindowsSelectorImpl.this;
         super();
         readFds = new int[1025];
         writeFds = new int[1025];
         exceptFds = new int[1025];
	 //如果索引参数(线程集合的索引),则初始化为0
         pollArrayIndex = 0;
     }
     private SubSelector(int i)
     {
         this$0 = WindowsSelectorImpl.this;
         super();
         readFds = new int[1025];
         writeFds = new int[1025];
         exceptFds = new int[1025];
	 //有索引参数,则定位pollArrayIndex的位置,每个线程
	 //处理的文件描述为1024个
         pollArrayIndex = (i + 1) * 1024;
     }
     //从pollWrapper的pollArrayAddress位置超时读取Math.min(totalChannels, 1024)个文件描述
     //到读写操作文件描述符集
     private int poll()
         throws IOException
     {
         return poll0(pollWrapper.pollArrayAddress, Math.min(totalChannels, 1024), readFds, writeFds, exceptFds, timeout);
     }
     //从pollWrapper的pollArrayAddress + (long)(pollArrayIndex * PollArrayWrapper.SIZE_POLLFD)位置
     //超时读取Math.min(1024, totalChannels - (i + 1) * 1024)个文件描述
     //到读写操作文件描述符集
     private int poll(int i)
         throws IOException
     {
         return poll0(pollWrapper.pollArrayAddress + (long)(pollArrayIndex * PollArrayWrapper.SIZE_POLLFD), Math.min(1024, totalChannels - (i + 1) * 1024), readFds, writeFds, exceptFds, timeout);
     }
     private native int poll0(long l, int i, int ai[], int ai1[], int ai2[], long l1);
     //处理选择key集合,l为选择key清除次数
     private int processSelectedKeys(long l)
     {
         int i = 0;
	 //处理读操作选择key
         i += processFDSet(l, readFds, 1, false);
	 //处理写操作选择key
         i += processFDSet(l, writeFds, 6, false);
         i += processFDSet(l, exceptFds, 7, true);
         return i;
     }
     private int processFDSet(long l, int ai[], int i, boolean flag)
     {
         int j = 0;
         for(int k = 1; k <= ai[0]; k++)
         {
             int i1 = ai[k];
             if(i1 == wakeupSourceFd)
             {
                 synchronized(interruptLock)
                 {
		     //可以唤醒等待操作的选择操作线程
                     interruptTriggered = true;
                 }
                 continue;
             }
             MapEntry mapentry = fdMap.get(i1);
             if(mapentry == null)
                 continue;
             SelectionKeyImpl selectionkeyimpl = mapentry.ski;
	     //是否丢弃i1文件描述的UrgentData
             if(flag && (selectionkeyimpl.channel() instanceof SocketChannelImpl) && discardUrgentData(i1))
                 continue;
	     //如果选择key集合selectedKeys中包括selectionkeyimpl,则根据clearedCount和当前清除计数器l
	     //判断是重新设置通道就绪事件,还是直接更新通道就绪事件
             if(selectedKeys.contains(selectionkeyimpl))
             {
                 if(mapentry.clearedCount != l)
                 {
		     //选择key的清除计数器不为l,则重新设置通道就绪事件
                     if(selectionkeyimpl.channel.translateAndSetReadyOps(i, selectionkeyimpl) && mapentry.updateCount != l)
                     {
		         //更新计数器自增
                         mapentry.updateCount = l;
                         j++;
                     }
                 } else
		 //直接更新通道就绪事件
                 if(selectionkeyimpl.channel.translateAndUpdateReadyOps(i, selectionkeyimpl) && mapentry.updateCount != l)
                 {
                     mapentry.updateCount = l;
                     j++;
                 }
                 mapentry.clearedCount = l;
                 continue;
             }
	     /如果选择key集合selectedKeys中不包括selectionkeyimpl
             if(mapentry.clearedCount != l)
             {
	         //设置通道就绪事件,并添加到selectedKeys集合中,重置更新计数器
                 selectionkeyimpl.channel.translateAndSetReadyOps(i, selectionkeyimpl);
                 if((selectionkeyimpl.nioReadyOps() & selectionkeyimpl.nioInterestOps()) != 0)
                 {
                     selectedKeys.add(selectionkeyimpl);
                     mapentry.updateCount = l;
                     j++;
                 }
             } else
             {   
	         //直接更新通道就绪事件
                 selectionkeyimpl.channel.translateAndUpdateReadyOps(i, selectionkeyimpl);
                 if((selectionkeyimpl.nioReadyOps() & selectionkeyimpl.nioInterestOps()) != 0)
                 {
                     selectedKeys.add(selectionkeyimpl);
                     mapentry.updateCount = l;
                     j++;
                 }
             }
             mapentry.clearedCount = l;
         }

         return j;
     }
}

private native boolean discardUrgentData(int i);

从上可以看出SubSelector主要有两个方法以poll从pollWrapper拉取关注读写事件的选择key;processSelectedKeys方法主要是更新关注读写事件的选择key的相关通道的已经就绪的操作事件集。
//StartLock
private final class StartLock
{
    private long runsCounter;//选择操作线程计数器
    final WindowsSelectorImpl this$0;
    private StartLock()
    {
        this$0 = WindowsSelectorImpl.this;
        super();
    }
    private synchronized void startThreads()
    {
        //启动线程,唤醒所有等待开始的选择操作线程
        runsCounter++;
        notifyAll();
    }
    private synchronized boolean waitForStart(SelectThread selectthread)
    {
        while(runsCounter == selectthread.lastRun) 
            try
            {
	        //选择操作线程等待开始信号
                startLock.wait();
            }
            catch(InterruptedException interruptedexception)
            {
                Thread.currentThread().interrupt();
            }
        if(selectthread.isZombie())
        {
	    //处于等待状态
            return true;
        } else
        {
	   //选择操作线程正在执行,waitForStart返回else
            selectthread.lastRun = runsCounter;
            return false;
        }
    }
}

StartLock主要控制选择线程,startThreads方法为唤醒所有等待选择操作的线程,
运行计数器runsCounter自增,waitForStart方法为,判断选择线程是否需要等待
开始锁。
//FinishLock
private final class FinishLock
 {
     private int threadsToFinish;//选择操作线程计数器
     IOException exception;
     final WindowsSelectorImpl this$0;
     private FinishLock()
     {
         this$0 = WindowsSelectorImpl.this;
         super();
         exception = null;
     }
       //重置需要完成选择操作的线程计数器
     private void reset()
     {
         threadsToFinish = threads.size();
     }
      private synchronized void threadFinished()
     {
         if(threadsToFinish == threads.size())
	     //如果选择操作线程计数器为线程集合的大小,则唤醒等待选择操作的线程
             wakeup();
	 //选择操作线程计数器自减
         threadsToFinish--;
         if(threadsToFinish == 0)
	     //如果选择线程都执行完,则唤醒等待完成锁的线程
             notify();
     }
     private synchronized void waitForHelperThreads()
     {
         if(threadsToFinish == threads.size())
	    //唤醒等待选择操作的线程
             wakeup();
	 //如果所有选择操作没完成,则等待所有选择操作完成
         while(threadsToFinish != 0) 
             try
             {
	         //只有在所有选择操作线程都完成后,完成锁才释放。
                 finishLock.wait();
             }
             catch(InterruptedException interruptedexception)
             {
                 Thread.currentThread().interrupt();
             }
     }
     //设置选择线程执行异常
     private synchronized void setException(IOException ioexception)
     {
         exception = ioexception;
     }
     //检查异常,如果选择线程执行异常,将异常包装为IOException抛出
     private void checkForException()
         throws IOException
     {
         if(exception == null)
         {
             return;
         } else
         {
             StringBuffer stringbuffer = new StringBuffer("An exception occured during the execution of select(): \n");
             stringbuffer.append(exception);
             stringbuffer.append('\n');
             exception = null;
             throw new IOException(stringbuffer.toString());
         }
     }
 }

FinishLock用于控制线程集合中的选择线程,完成锁只有在所有线程集合中的执行完,
才释放,waitForHelperThreads方法为等待完成锁,threadFinished方法为当前选择线程
已结束,更新完成的选择线程计数器threadsToFinish(减一),reset方法重置threadsToFinish为线程集合大小。
再来看SelectThread
//SelectThread
private final class SelectThread extends Thread
 {
     //选择线程索引,即WindowsSelectorImpl中的线程集合对应的索引
     //为什么是这样,我们后面会说
     private final int index;
     final SubSelector subSelector;
     private long lastRun;//已经运行的选择操作线程数
     //选择操作线程状态,是否处于等待状态或空闲状态
     private volatile boolean zombie;
     final WindowsSelectorImpl this$0;
     private SelectThread(int i)
     {
         this$0 = WindowsSelectorImpl.this;
         super();
         lastRun = 0L;
         index = i;
         subSelector = new SubSelector(i);
         lastRun = startLock.runsCounter;
     }
     //置选择操作状态为空闲等待
     void makeZombie()
     {
         zombie = true;
     }
     boolean isZombie()
     {
         return zombie;
     }
     public void run()
     {
         do
         {
	     //如果需要等待开始锁,则直接返回
             if(startLock.waitForStart(this))
                 return;
             try
             {
	         //否则拉取索引index批次的选择key
                 subSelector.poll(index);
             }
             catch(IOException ioexception)
             {
                 finishLock.setException(ioexception);
             }
	     //完成,更新完成选择操作线程计数器,自减
             finishLock.threadFinished();
         } while(true);
     }
 }

SelectThread线程启动时等待startLock,从pollWrapper拉取索引index对应的关注读写事件的选择key如果运行异常,则设置finishLock的finishLock,运行结束则更新完成选择操作线程计数器(自减)。

当这里我们已经把StartLock,FinishLock,SelectThread和SubSelector看完,再来看
选择操作中的doSelect(long l)
protected int doSelect(long l)
    throws IOException
{
    if(channelArray == null)
        //选择器已关闭
        throw new ClosedSelectorException();
    timeout = l;//设置超时时间
    //反注册已经取消的选key
    processDeregisterQueue();
    if(interruptTriggered)
    {
        //已触发中断,则重新设置唤醒通道
        resetWakeupSocket();
        return 0;
    }
    //调整选择线程数量
    adjustThreadsCount();
    //重置完成锁需要完整的选择线程数量
    finishLock.reset();
    //启动所有等待选择操作的线程
    startLock.startThreads();
    begin();//这个方法与end方法配合使用记录在io操作的过程中是否被中断
    try
    {
        //pollWrapper的起始位置拉取读写选择key
        subSelector.poll();
    }
    catch(IOException ioexception)
    {
        finishLock.setException(ioexception);
    }
    //如果选择线程不为空,则等待所有选择线程结束
    if(threads.size() > 0)
        finishLock.waitForHelperThreads();
    end();
    break MISSING_BLOCK_LABEL_114;
    Exception exception;
    exception;
    end();
    throw exception;
    //检查异常
    finishLock.checkForException();
    processDeregisterQueue();
    //更新已准备就绪的通道操作事件
    int i = updateSelectedKeys();
    resetWakeupSocket();
    return i;
}

这个方法中我们有几点要看
3.1
//已触发中断,则重新设置唤醒通道
 resetWakeupSocket();

3.2
//调整选择线程数量
adjustThreadsCount();

3.3
//更新已准备就绪的通道操作事件
int i = updateSelectedKeys();

下面分别来看这几点:
3.1
//已触发中断,则重新设置唤醒通道
resetWakeupSocket();


private void resetWakeupSocket()
    {
label0:
        {
            synchronized(interruptLock)
            {
                if(interruptTriggered)
                    break label0;
            }
            return;
        }
	//重新设置唤醒通道文件描述符
        resetWakeupSocket0(wakeupSourceFd);
        interruptTriggered = false;
        obj;
        JVM INSTR monitorexit ;
          goto _L1
        exception;
        throw exception;
_L1:
    }

private native void resetWakeupSocket0(int i);


3.2
//调整选择线程数量
adjustThreadsCount();

 private void adjustThreadsCount()
    {
        if(threadsCount > threads.size())
        {
	     //需要的线程数量大于线程集合实际线程数,补充选择操作线程
            for(int i = threads.size(); i < threadsCount; i++)
            {
                SelectThread selectthread = new SelectThread(i);
                threads.add(selectthread);
                selectthread.setDaemon(true);
                selectthread.start();
            }

        } else
        if(threadsCount < threads.size())
        {
	   //否则从选择线程集合中移除不必要的选择线程,并设置状态为Zombie(空闲)
            for(int j = threads.size() - 1; j >= threadsCount; j--)
                ((SelectThread)threads.remove(j)).makeZombie();

        }
    }

3.3
//更新已准备就绪的通道操作事件
int i = updateSelectedKeys();


private int updateSelectedKeys()
    {
        updateCount++;
        int i = 0
        i += subSelector.processSelectedKeys(updateCount);
	//遍历选择线程集,更新子选择线程中已经就绪的通道操作事件
        for(Iterator iterator = threads.iterator(); iterator.hasNext();)
        {
            SelectThread selectthread = (SelectThread)iterator.next();
	    //更新通道已经就绪的操作事件
            i += selectthread.subSelector.processSelectedKeys(updateCount);
        }

        return i;
    }

doSelect方法将选择操作分成多个选择线程SelectThread放在选择线程放在threads集合中,每个SelectThread使用,SubSelector从当前注册到选择器的通道中选取SubSelector索引所对应的批次的通道已经就绪的通道并更新操作事件。整个选择过程有startLock和finishLock来控制。再有在一个选择操作的所有子选择线程执行完,才释放finishLock。下一个选择操作才能开始,即startLock可用。

4.唤醒方法wakeup
public Selector wakeup()
    {
        synchronized(interruptLock)
        {
            if(!interruptTriggered)
            {
	        //设置唤醒sink通道
                setWakeupSocket();
                interruptTriggered = true;
            }
        }
        return this;
    }
    private void setWakeupSocket()
    {
        //唤醒所有等待选择操作的线程
        setWakeupSocket0(wakeupSinkFd);
    }
    private native void setWakeupSocket0(int i);

wakeup主要是通过sink通道发送信息给source通道(native实现),通知子选择线程可以进行选择操作。子选择线程选择主要处理相应批次的1024个通道就绪事件(每批次通道关联到source通道)。
5.实际关闭选择通道方法implClose
    protected void implClose()
        throws IOException
    {
        synchronized(closeLock)
        {
            if(channelArray != null && pollWrapper != null)
            {
                synchronized(interruptLock)
                {
                    interruptTriggered = true;
                }
		//关闭唤醒管道的source和sink通道
                wakeupPipe.sink().close();
                wakeupPipe.source().close();
                for(int i = 1; i < totalChannels; i++)
                {
                    if(i % 1024 == 0)
		        //1024的整数位置为唤醒source通道,跳过
                        continue;
		    //反注册通道
                    deregister(channelArray[i]);
                    SelectableChannel selectablechannel = channelArray[i].channel();
                    if(!selectablechannel.isOpen() && !selectablechannel.isRegistered())
                        ((SelChImpl)selectablechannel).kill();
                }
                //释放pollWrapper空间
                pollWrapper.free();
                pollWrapper = null;
                selectedKeys = null;
                channelArray = null;
                SelectThread selectthread;
		//结束所有选择线程集合中的线程
                for(Iterator iterator = threads.iterator(); iterator.hasNext(); selectthread.makeZombie())
                    selectthread = (SelectThread)iterator.next();

                startLock.startThreads();
            }
        }
 }

implClose方法主要关闭唤醒管道的sink和source通道,反注册选择器的所有通道,释放所有通道空间,结束所有选择线程集合中的线程。
再回到WindowsSelectorImpl集合,变量声明和构造
final class WindowsSelectorImpl extends SelectorImpl
{
    private final int INIT_CAP = 8;//选择key集合,key包装集合初始化容量
    private static final int MAX_SELECTABLE_FDS = 1024;//最大选择key数量
    private SelectionKeyImpl channelArray[];//选择器关联通道集合
    private PollArrayWrapper pollWrapper;//存放所有文件描述对象(选择key,唤醒管道的源与sink通道)的集合
    private int totalChannels;//注册到选择的通道数量
    private int threadsCount;//选择线程数
    private final List threads = new ArrayList();//选择操作线程集合
    private final Pipe wakeupPipe = Pipe.open();//唤醒等待选择操操的管道
    private final int wakeupSourceFd;//唤醒管道源通道文件描述
    private final int wakeupSinkFd;//唤醒管道sink通道文件描述
    private Object closeLock;//选择器关闭同步锁
    private final FdMap fdMap = new FdMap();//存放选择key文件描述与选择key映射关系的Map
    //子选择器,主要从pollWrapper拉取读写选择key,并更新key通道的就绪操作事件集
    private final SubSelector subSelector = new SubSelector();
    private long timeout;//超时时间,从pollWrapper拉取文件描述的超时时间
    private final Object interruptLock = new Object();//中断同步锁,在唤醒选择操作线程时,用于同步
    private volatile boolean interruptTriggered;//是否唤醒等待选择操的线程
    private final StartLock startLock = new StartLock();//选择操作开始锁
    private final FinishLock finishLock = new FinishLock();//选择操作结束锁
    private long updateCount;//已选择key,更新就绪操作事件计数器
    static final boolean $assertionsDisabled = !sun/nio/ch/WindowsSelectorImpl.desiredAssertionStatus();
    static 
    {
        //加载nio,net资源库
        Util.load();
    } 
}

WindowsSelectorImpl(SelectorProvider selectorprovider)
        throws IOException
    {
        super(selectorprovider);
	//创建选择器关联通道数组,实际存的为选择key
        channelArray = new SelectionKeyImpl[8];
        totalChannels = 1;//通道计数器
        threadsCount = 0;//线程计数器
        closeLock = new Object();//关闭锁
        interruptTriggered = false;//唤醒是否触发
        updateCount = 0L;//更新计数器
        pollWrapper = new PollArrayWrapper(8);
        wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal();//唤醒管道源通道文件描述id
        SinkChannelImpl sinkchannelimpl = (SinkChannelImpl)wakeupPipe.sink();//唤醒管道sink通道
        sinkchannelimpl.sc.socket().setTcpNoDelay(true);//设置唤醒管道sink通道的Socket为无延时
        wakeupSinkFd = sinkchannelimpl.getFDVal();
	//将唤醒管道的源通道文件描述id添加pollWrapper的索引0位置上
        pollWrapper.addWakeupSocket(wakeupSourceFd, 0);
    }

WindowsSelectorImpl默认加载net和nio资源库;WindowsSelectorImpl内锁4个,分别为关闭锁closeLock,中断锁interruptLock,选择操作开始锁startLock,选择操作结束finishLock控制;一个唤醒管道(wakeupSourceFd,wakeupSinkFd),所有阻塞选择操的子选择线程与wakeupSourceFd相关联;唤醒方法主要是通过Sink通道发送消息给source通道,以唤醒所有选择操作线程。注册到选择器的通道计数器totalChannels;updateCount计数器,已选择key集合更新的记录数;通道集合channelArray,存放的元素实际为通道关联的选择key;

总结:
    implRegister方法,首先同步关闭锁,以防在注册的过程中,选择器被关闭;检查选择器是否关闭,没有关闭,则检查是否扩容,需要则扩容为pollWrapper为原来的两倍;检查过后,添加选择key到选择器通道集合,设置key在选择器通道集合的索引,添加选择key到文件描述fdMap,添加key到key集合,将选择key添加到文件描述信息及关注操作事件包装集合pollWrapper,通道计数器自增。
    implDereg方法首选判断反注册的key是不是在通道key尾部,不在交换,并将交换信息更新到pollWrapper,从fdMap,keys,selectedKeys集合移除选择key,并将key从通道中移除。
    SubSelector主要有两个方法以poll从pollWrapper拉取关注读写事件的选择key;
processSelectedKeys方法主要是更新关注读写事件的选择key的相关通道的已经就绪的操作事件集。
    StartLock主要控制选择线程,startThreads方法为唤醒所有等待选择操作的线程,运行计数器runsCounter自增,waitForStart方法为,判断选择线程是否需要等待开始锁。
    FinishLock用于控制线程集合中的选择线程,完成锁只有在所有线程集合中的执行完,
才释放,waitForHelperThreads方法为等待完成锁,threadFinished方法为当前选择线程
已结束,更新完成的选择线程计数器threadsToFinish(减一),reset方法重置threadsToFinish为线程集合大小。
   SelectThread线程启动时等待startLock,从pollWrapper拉取索引index对应的关注读写事件的选择key如果运行异常,则设置finishLock的finishLock,运行结束则更新完成选择操作线程计数器(自减)。
   doSelect方法将选择操作分成多个选择线程SelectThread放在选择线程放在threads集合中,每个SelectThread使用SubSelector从当前注册到选择器的通道中选取SubSelector索引所对应的批次的通道已经就绪的通道并更新操作事件。整个选择过程有startLock和finishLock来控制。再有在一个选择操作的所有子选择线程执行完,才释放finishLock。下一个选择操作才能开始,即startLock可用。
   wakeup主要是通过sink通道发送信息给source通道(native实现),通知子选择线程可以进行选择操作。子选择线程选择主要处理相应批次的1024个通道就绪事件(每批次通道关联到source通道)。
   implClose方法主要关闭唤醒管道的sink和source通道,反注册选择器的所有通道,释放所有通道空间,结束所有选择线程集合中的线程


1
1
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics