- 浏览: 982346 次
文章分类
- 全部博客 (428)
- Hadoop (2)
- HBase (1)
- ELK (1)
- ActiveMQ (13)
- Kafka (5)
- Redis (14)
- Dubbo (1)
- Memcached (5)
- Netty (56)
- Mina (34)
- NIO (51)
- JUC (53)
- Spring (13)
- Mybatis (17)
- MySQL (21)
- JDBC (12)
- C3P0 (5)
- Tomcat (13)
- SLF4J-log4j (9)
- P6Spy (4)
- Quartz (12)
- Zabbix (7)
- JAVA (9)
- Linux (15)
- HTML (9)
- Lucene (0)
- JS (2)
- WebService (1)
- Maven (4)
- Oracle&MSSQL (14)
- iText (11)
- Development Tools (8)
- UTILS (4)
- LIFE (8)
最新评论
-
Donald_Draper:
Donald_Draper 写道刘落落cici 写道能给我发一 ...
DatagramChannelImpl 解析三(多播) -
Donald_Draper:
刘落落cici 写道能给我发一份这个类的源码吗Datagram ...
DatagramChannelImpl 解析三(多播) -
lyfyouyun:
请问楼主,执行消息发送的时候,报错:Transport sch ...
ActiveMQ连接工厂、连接详解 -
ezlhq:
关于 PollArrayWrapper 状态含义猜测:参考 S ...
WindowsSelectorImpl解析一(FdMap,PollArrayWrapper) -
flyfeifei66:
打算使用xmemcache作为memcache的客户端,由于x ...
Memcached分布式客户端(Xmemcached)
WindowsSelectorImpl解析一(FdMap,PollArrayWrapper):http://donald-draper.iteye.com/blog/2370811
引言:
上一篇文章我们简单看了一下的WindowsSelectorImpl内部集合和变量及同步锁的定义,先来回顾下,
WindowsSelectorImpl默认加载net和nio资源库;WindowsSelectorImpl内锁4个,分别为关闭锁closeLock,中断锁interruptLock,startLock,finishLock后面两个的作用,目前还不清楚,后面再说;一个唤醒管道,作用尚不明确;一个注册到选择器的通道计数器totalChannels;updateCount计数器作用,尚不明确;通道集合channelArray,存放的元素实际为通道关联的选择key;
pollWrapper用于存储选择key和相应的兴趣事件,及唤醒管道的源通道,唤醒管道的源通道存放在pollWrapper的索引0位置上。
FdMap主要是存储选择key的,FdMap实际上是一个HashMap,key为选择key的文件描述id,value为MapEntry,MapEntry为选择key的包装Entry,里面含有更新计数器updateCount和清除计数器clearedCount。
PollArrayWrapper存放选择key和通道及其相关的操作事件。PollArrayWrapper通过AllocatedNativeObject来存储先关的文件描述及其兴趣事件,AllocatedNativeObject
为已分配的底层内存空间,AllocatedNativeObject的内存主要NativeObject来分配,
NativeObject实际是通过Unsafe来分配内存。PollArrayWrapper作用即存放选择key和选择key关注的事件,用选择key的文件描述id,表示选择key,文件描述id为int,所以占4个字节,选择key的兴趣操作事件也为int,即4个字节,所以SIZE_POLLFD为8,文件描述id开始位置FD_OFFSET为0,兴趣事件开始位置EVENT_OFFSET为4;FD_OFFSET和EVENT_OFFSET都是相对于SIZE_POLLFD的。PollArrayWrapper同时存储唤醒等待选择操作的选择器的通道和唤醒通道关注事件,即通道注册选择器事件,即添加选择key事件。当有通道注册到选择器,则唤醒通道,唤醒等待选择操作的选择器。
这里我们把WindowsSelectorImpl的内部结构和构造代码贴出了,以便我们进一步跟进上一篇文章遗留的问题。
WindowsSelectorImpl默认加载net和nio资源库;WindowsSelectorImpl内锁4个,分别为关闭锁closeLock,中断锁interruptLock,startLock,finishLock后面两个的作用,目前还不清楚,后面再说;一个唤醒管道,作用尚不明确;一个注册到选择器的通道计数器totalChannels;updateCount计数器作用,尚不明确;通道集合channelArray,存放的元素实际为通道关联的选择key;pollWrapper用于存储选择key和相应的兴趣事件,及唤醒管道的源通道,唤醒管道的源通道存放在pollWrapper的索引0位置上。
我们要关注的几个方法为
1.注册key操作的implRegister方法
2.处理取消key集合方法中implDereg方法
3.选择操作中的doSelect(long l)
4.唤醒方法wakeup
5.实际关闭选择通道方法implClose
今天我们通过这几个方法,来理解interruptLock,startLock,finishLock,totalChannels,
threadsCount,updateCount,wakeupPipe(wakeupSinkFd,wakeupSourceFd),
subSelector等作用。
下面先看
1.注册key操作的implRegister方法
这个方法中我们需要关注的是这一句:
到这里可以看到通道计数器totalChannels,是pollWrapper中文件描述符的数量,在WindowsSelectorImpl的构造中totalChannels被初始化为1,而不是0,因为在构造是已经将wakeupSourceFd添加到pollWrapper,wakeupSourceFd在pollWrapper为位置假设为i,那么通道集合相应位置上的元素实际上为null。
从implRegister的方法,可以看出,首先同步关闭锁,以防在注册的过程中,选择器被关闭;
检查选择器是否关闭,没有关闭,则检查是否扩容,需要则扩容为pollWrapper为原来的两倍;检查过后,添加选择key到选择器通道集合,设置key在选择器通道集合的索引,添加选择key到文件描述fdMap,添加key到key集合,将选择key添加到文件描述信息及关注操作事件包装集合pollWrapper,通道计数器自增;
再来看实际的反注册key方法
2.处理取消key集合方法中implDereg方法
从implDereg的方法,可以看出点线程计数器threadsCount的意思,threadCount记录的是
pollWrapper中wakeupSourceFd的数量,pollWrapper的文件描述是分批次的,每批次有1024文件描述,这个批次的文件描述的第一个为wakeupSourceFd。implDereg方法首选判断反注册的key是不是在通道key尾部,不在交换,并将交换信息更新到pollWrapper,
从fdMap,keys,selectedKeys集合移除选择key,并将key从通道中移除。
3.选择操作中的doSelect(long l)
在看这个方法之前,因为涉及到StartLock,FinishLock,SelectThread和SubSelector,先看一下这个类的定义;
从上可以看出SubSelector主要有两个方法以poll从pollWrapper拉取关注读写事件的选择key;processSelectedKeys方法主要是更新关注读写事件的选择key的相关通道的已经就绪的操作事件集。
//StartLock
StartLock主要控制选择线程,startThreads方法为唤醒所有等待选择操作的线程,
运行计数器runsCounter自增,waitForStart方法为,判断选择线程是否需要等待
开始锁。
//FinishLock
FinishLock用于控制线程集合中的选择线程,完成锁只有在所有线程集合中的执行完,
才释放,waitForHelperThreads方法为等待完成锁,threadFinished方法为当前选择线程
已结束,更新完成的选择线程计数器threadsToFinish(减一),reset方法重置threadsToFinish为线程集合大小。
再来看SelectThread
//SelectThread
SelectThread线程启动时等待startLock,从pollWrapper拉取索引index对应的关注读写事件的选择key如果运行异常,则设置finishLock的finishLock,运行结束则更新完成选择操作线程计数器(自减)。
当这里我们已经把StartLock,FinishLock,SelectThread和SubSelector看完,再来看
选择操作中的doSelect(long l)
这个方法中我们有几点要看
3.1
//已触发中断,则重新设置唤醒通道
3.2
//调整选择线程数量
3.3
//更新已准备就绪的通道操作事件
下面分别来看这几点:
3.1
//已触发中断,则重新设置唤醒通道
3.2
//调整选择线程数量
3.3
//更新已准备就绪的通道操作事件
doSelect方法将选择操作分成多个选择线程SelectThread放在选择线程放在threads集合中,每个SelectThread使用,SubSelector从当前注册到选择器的通道中选取SubSelector索引所对应的批次的通道已经就绪的通道并更新操作事件。整个选择过程有startLock和finishLock来控制。再有在一个选择操作的所有子选择线程执行完,才释放finishLock。下一个选择操作才能开始,即startLock可用。
4.唤醒方法wakeup
wakeup主要是通过sink通道发送信息给source通道(native实现),通知子选择线程可以进行选择操作。子选择线程选择主要处理相应批次的1024个通道就绪事件(每批次通道关联到source通道)。
5.实际关闭选择通道方法implClose
implClose方法主要关闭唤醒管道的sink和source通道,反注册选择器的所有通道,释放所有通道空间,结束所有选择线程集合中的线程。
再回到WindowsSelectorImpl集合,变量声明和构造
WindowsSelectorImpl默认加载net和nio资源库;WindowsSelectorImpl内锁4个,分别为关闭锁closeLock,中断锁interruptLock,选择操作开始锁startLock,选择操作结束finishLock控制;一个唤醒管道(wakeupSourceFd,wakeupSinkFd),所有阻塞选择操的子选择线程与wakeupSourceFd相关联;唤醒方法主要是通过Sink通道发送消息给source通道,以唤醒所有选择操作线程。注册到选择器的通道计数器totalChannels;updateCount计数器,已选择key集合更新的记录数;通道集合channelArray,存放的元素实际为通道关联的选择key;
总结:
implRegister方法,首先同步关闭锁,以防在注册的过程中,选择器被关闭;检查选择器是否关闭,没有关闭,则检查是否扩容,需要则扩容为pollWrapper为原来的两倍;检查过后,添加选择key到选择器通道集合,设置key在选择器通道集合的索引,添加选择key到文件描述fdMap,添加key到key集合,将选择key添加到文件描述信息及关注操作事件包装集合pollWrapper,通道计数器自增。
implDereg方法首选判断反注册的key是不是在通道key尾部,不在交换,并将交换信息更新到pollWrapper,从fdMap,keys,selectedKeys集合移除选择key,并将key从通道中移除。
SubSelector主要有两个方法以poll从pollWrapper拉取关注读写事件的选择key;
processSelectedKeys方法主要是更新关注读写事件的选择key的相关通道的已经就绪的操作事件集。
StartLock主要控制选择线程,startThreads方法为唤醒所有等待选择操作的线程,运行计数器runsCounter自增,waitForStart方法为,判断选择线程是否需要等待开始锁。
FinishLock用于控制线程集合中的选择线程,完成锁只有在所有线程集合中的执行完,
才释放,waitForHelperThreads方法为等待完成锁,threadFinished方法为当前选择线程
已结束,更新完成的选择线程计数器threadsToFinish(减一),reset方法重置threadsToFinish为线程集合大小。
SelectThread线程启动时等待startLock,从pollWrapper拉取索引index对应的关注读写事件的选择key如果运行异常,则设置finishLock的finishLock,运行结束则更新完成选择操作线程计数器(自减)。
doSelect方法将选择操作分成多个选择线程SelectThread放在选择线程放在threads集合中,每个SelectThread使用SubSelector从当前注册到选择器的通道中选取SubSelector索引所对应的批次的通道已经就绪的通道并更新操作事件。整个选择过程有startLock和finishLock来控制。再有在一个选择操作的所有子选择线程执行完,才释放finishLock。下一个选择操作才能开始,即startLock可用。
wakeup主要是通过sink通道发送信息给source通道(native实现),通知子选择线程可以进行选择操作。子选择线程选择主要处理相应批次的1024个通道就绪事件(每批次通道关联到source通道)。
implClose方法主要关闭唤醒管道的sink和source通道,反注册选择器的所有通道,释放所有通道空间,结束所有选择线程集合中的线程
引言:
上一篇文章我们简单看了一下的WindowsSelectorImpl内部集合和变量及同步锁的定义,先来回顾下,
WindowsSelectorImpl默认加载net和nio资源库;WindowsSelectorImpl内锁4个,分别为关闭锁closeLock,中断锁interruptLock,startLock,finishLock后面两个的作用,目前还不清楚,后面再说;一个唤醒管道,作用尚不明确;一个注册到选择器的通道计数器totalChannels;updateCount计数器作用,尚不明确;通道集合channelArray,存放的元素实际为通道关联的选择key;
pollWrapper用于存储选择key和相应的兴趣事件,及唤醒管道的源通道,唤醒管道的源通道存放在pollWrapper的索引0位置上。
FdMap主要是存储选择key的,FdMap实际上是一个HashMap,key为选择key的文件描述id,value为MapEntry,MapEntry为选择key的包装Entry,里面含有更新计数器updateCount和清除计数器clearedCount。
PollArrayWrapper存放选择key和通道及其相关的操作事件。PollArrayWrapper通过AllocatedNativeObject来存储先关的文件描述及其兴趣事件,AllocatedNativeObject
为已分配的底层内存空间,AllocatedNativeObject的内存主要NativeObject来分配,
NativeObject实际是通过Unsafe来分配内存。PollArrayWrapper作用即存放选择key和选择key关注的事件,用选择key的文件描述id,表示选择key,文件描述id为int,所以占4个字节,选择key的兴趣操作事件也为int,即4个字节,所以SIZE_POLLFD为8,文件描述id开始位置FD_OFFSET为0,兴趣事件开始位置EVENT_OFFSET为4;FD_OFFSET和EVENT_OFFSET都是相对于SIZE_POLLFD的。PollArrayWrapper同时存储唤醒等待选择操作的选择器的通道和唤醒通道关注事件,即通道注册选择器事件,即添加选择key事件。当有通道注册到选择器,则唤醒通道,唤醒等待选择操作的选择器。
这里我们把WindowsSelectorImpl的内部结构和构造代码贴出了,以便我们进一步跟进上一篇文章遗留的问题。
final class WindowsSelectorImpl extends SelectorImpl { private final int INIT_CAP = 8;//选择key集合,key包装集合初始化容量 private static final int MAX_SELECTABLE_FDS = 1024;//最大选择key数量 private SelectionKeyImpl channelArray[];//选择器关联通道集合 private PollArrayWrapper pollWrapper;//存放所有文件描述对象(选择key,唤醒管道的源与sink通道)的集合 private int totalChannels;//注册到选择的通道数量 private int threadsCount;//选择线程数 private final List threads = new ArrayList();//选择操作线程集合 private final Pipe wakeupPipe = Pipe.open();//唤醒等待选择操操的管道 private final int wakeupSourceFd;//唤醒管道源通道文件描述 private final int wakeupSinkFd;//唤醒管道sink通道文件描述 private Object closeLock;//选择器关闭同步锁 private final FdMap fdMap = new FdMap();//存放选择key文件描述与选择key映射关系的Map //子选择器,主要从pollWrapper拉取读写选择key,并更新key通道的就绪操作事件集 private final SubSelector subSelector = new SubSelector(); private long timeout;//超时时间,从pollWrapper拉取文件描述的超时时间 private final Object interruptLock = new Object();//中断同步锁,在唤醒选择操作线程时,用于同步 private volatile boolean interruptTriggered;//是否唤醒等待选择操的线程 private final StartLock startLock = new StartLock();//选择操作开始锁 private final FinishLock finishLock = new FinishLock();//选择操作结束锁 private long updateCount;//已选择key,更新就绪操作事件计数器 static final boolean $assertionsDisabled = !sun/nio/ch/WindowsSelectorImpl.desiredAssertionStatus(); static { //加载nio,net资源库 Util.load(); } }
WindowsSelectorImpl(SelectorProvider selectorprovider) throws IOException { super(selectorprovider); //创建选择器关联通道数组,实际存的为选择key channelArray = new SelectionKeyImpl[8]; totalChannels = 1;//通道计数器 threadsCount = 0;//线程计数器 closeLock = new Object();//关闭锁 interruptTriggered = false; updateCount = 0L;//更新计数器 pollWrapper = new PollArrayWrapper(8); wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal();//唤醒管道源通道文件描述id SinkChannelImpl sinkchannelimpl = (SinkChannelImpl)wakeupPipe.sink();//唤醒管道sink通道 sinkchannelimpl.sc.socket().setTcpNoDelay(true);//设置唤醒管道sink通道的Socket为无延时 wakeupSinkFd = sinkchannelimpl.getFDVal(); //将唤醒管道的源通道文件描述id添加pollWrapper的索引0位置上 pollWrapper.addWakeupSocket(wakeupSourceFd, 0); }
WindowsSelectorImpl默认加载net和nio资源库;WindowsSelectorImpl内锁4个,分别为关闭锁closeLock,中断锁interruptLock,startLock,finishLock后面两个的作用,目前还不清楚,后面再说;一个唤醒管道,作用尚不明确;一个注册到选择器的通道计数器totalChannels;updateCount计数器作用,尚不明确;通道集合channelArray,存放的元素实际为通道关联的选择key;pollWrapper用于存储选择key和相应的兴趣事件,及唤醒管道的源通道,唤醒管道的源通道存放在pollWrapper的索引0位置上。
我们要关注的几个方法为
1.注册key操作的implRegister方法
2.处理取消key集合方法中implDereg方法
3.选择操作中的doSelect(long l)
4.唤醒方法wakeup
5.实际关闭选择通道方法implClose
今天我们通过这几个方法,来理解interruptLock,startLock,finishLock,totalChannels,
threadsCount,updateCount,wakeupPipe(wakeupSinkFd,wakeupSourceFd),
subSelector等作用。
下面先看
1.注册key操作的implRegister方法
protected void implRegister(SelectionKeyImpl selectionkeyimpl) { //同步关闭锁,以防在注册的过程中,选择器被关闭 synchronized(closeLock) { if(pollWrapper == null) //文件描述包装集合为null,即选器已关闭 throw new ClosedSelectorException(); growIfNeeded();// channelArray[totalChannels] = selectionkeyimpl;//添加到选择器通道集合 selectionkeyimpl.setIndex(totalChannels);//设置key在选择器通道集合的索引 fdMap.put(selectionkeyimpl);//添加选择key到文件描述fdMap keys.add(selectionkeyimpl);//添加key到key集合 //将选择key添加到文件描述信息及关注操作事件包装集合pollWrapper pollWrapper.addEntry(totalChannels, selectionkeyimpl); totalChannels++;//通道计数器自增 } }
这个方法中我们需要关注的是这一句:
growIfNeeded();
private void growIfNeeded() { //如果选择器通道集合已满 if(channelArray.length == totalChannels) { //扩容选择器通道集合的容量为原来的两倍 int i = totalChannels * 2; //创建新的选择器通道集合,并将原始通道集合元素,拷贝到新集合中 SelectionKeyImpl aselectionkeyimpl[] = new SelectionKeyImpl[i]; System.arraycopy(channelArray, 1, aselectionkeyimpl, 1, totalChannels - 1); channelArray = aselectionkeyimpl; //扩容文件描述信息及关注操作事件包装集合pollWrapper pollWrapper.grow(i); } //如果通道数量为1024的整数倍 if(totalChannels % 1024 == 0) { //添加唤醒源通道到pollWrapper的索引totalChannels pollWrapper.addWakeupSocket(wakeupSourceFd, totalChannels); totalChannels++;//通道计数器自增 threadsCount++;//线程数自增 } }
到这里可以看到通道计数器totalChannels,是pollWrapper中文件描述符的数量,在WindowsSelectorImpl的构造中totalChannels被初始化为1,而不是0,因为在构造是已经将wakeupSourceFd添加到pollWrapper,wakeupSourceFd在pollWrapper为位置假设为i,那么通道集合相应位置上的元素实际上为null。
从implRegister的方法,可以看出,首先同步关闭锁,以防在注册的过程中,选择器被关闭;
检查选择器是否关闭,没有关闭,则检查是否扩容,需要则扩容为pollWrapper为原来的两倍;检查过后,添加选择key到选择器通道集合,设置key在选择器通道集合的索引,添加选择key到文件描述fdMap,添加key到key集合,将选择key添加到文件描述信息及关注操作事件包装集合pollWrapper,通道计数器自增;
再来看实际的反注册key方法
2.处理取消key集合方法中implDereg方法
protected void implDereg(SelectionKeyImpl selectionkeyimpl) throws IOException { //获取选择key在选择器通道集合中的索引 int i = selectionkeyimpl.getIndex(); //断言开启,且断言失败,i小于0,抛出断言异常 if(!$assertionsDisabled && i < 0) throw new AssertionError(); if(i != totalChannels - 1) { //如果反注册的key不在通道集合的尾部,则与尾部交换,更新key的通道集合索引 SelectionKeyImpl selectionkeyimpl1 = channelArray[totalChannels - 1]; channelArray[i] = selectionkeyimpl1; selectionkeyimpl1.setIndex(i); //并将交换信息同步到pollWrapper pollWrapper.replaceEntry(pollWrapper, totalChannels - 1, pollWrapper, i); } //置空通道集合totalChannels - 1索引上的元素 channelArray[totalChannels - 1] = null; totalChannels--;//通道计数器自减 selectionkeyimpl.setIndex(-1);//设置反注册key的索引为-1,已无效 if(totalChannels != 1 && totalChannels % 1024 == 1) { //如果反注册后,通道计数器不为1,且当前通道为每批次(1024)的第一个通道, //则通道计数器自减,线程计数器自减 totalChannels--; threadsCount--; } //从fdMap,keys,selectedKeys集合移除选择key fdMap.remove(selectionkeyimpl); keys.remove(selectionkeyimpl); selectedKeys.remove(selectionkeyimpl); //将key从通道中移除 deregister(selectionkeyimpl); SelectableChannel selectablechannel = selectionkeyimpl.channel(); if(!selectablechannel.isOpen() && !selectablechannel.isRegistered()) //如果通道关闭,且为注册,则kill ((SelChImpl)selectablechannel).kill(); }
从implDereg的方法,可以看出点线程计数器threadsCount的意思,threadCount记录的是
pollWrapper中wakeupSourceFd的数量,pollWrapper的文件描述是分批次的,每批次有1024文件描述,这个批次的文件描述的第一个为wakeupSourceFd。implDereg方法首选判断反注册的key是不是在通道key尾部,不在交换,并将交换信息更新到pollWrapper,
从fdMap,keys,selectedKeys集合移除选择key,并将key从通道中移除。
3.选择操作中的doSelect(long l)
在看这个方法之前,因为涉及到StartLock,FinishLock,SelectThread和SubSelector,先看一下这个类的定义;
private final class SubSelector { private final int pollArrayIndex;//pollWrapper索引 private final int readFds[];//读操作文件描述符集 private final int writeFds[];//写操作文件描述符集 private final int exceptFds[]; final WindowsSelectorImpl this$0; private SubSelector() { this$0 = WindowsSelectorImpl.this; super(); readFds = new int[1025]; writeFds = new int[1025]; exceptFds = new int[1025]; //如果索引参数(线程集合的索引),则初始化为0 pollArrayIndex = 0; } private SubSelector(int i) { this$0 = WindowsSelectorImpl.this; super(); readFds = new int[1025]; writeFds = new int[1025]; exceptFds = new int[1025]; //有索引参数,则定位pollArrayIndex的位置,每个线程 //处理的文件描述为1024个 pollArrayIndex = (i + 1) * 1024; } //从pollWrapper的pollArrayAddress位置超时读取Math.min(totalChannels, 1024)个文件描述 //到读写操作文件描述符集 private int poll() throws IOException { return poll0(pollWrapper.pollArrayAddress, Math.min(totalChannels, 1024), readFds, writeFds, exceptFds, timeout); } //从pollWrapper的pollArrayAddress + (long)(pollArrayIndex * PollArrayWrapper.SIZE_POLLFD)位置 //超时读取Math.min(1024, totalChannels - (i + 1) * 1024)个文件描述 //到读写操作文件描述符集 private int poll(int i) throws IOException { return poll0(pollWrapper.pollArrayAddress + (long)(pollArrayIndex * PollArrayWrapper.SIZE_POLLFD), Math.min(1024, totalChannels - (i + 1) * 1024), readFds, writeFds, exceptFds, timeout); } private native int poll0(long l, int i, int ai[], int ai1[], int ai2[], long l1); //处理选择key集合,l为选择key清除次数 private int processSelectedKeys(long l) { int i = 0; //处理读操作选择key i += processFDSet(l, readFds, 1, false); //处理写操作选择key i += processFDSet(l, writeFds, 6, false); i += processFDSet(l, exceptFds, 7, true); return i; } private int processFDSet(long l, int ai[], int i, boolean flag) { int j = 0; for(int k = 1; k <= ai[0]; k++) { int i1 = ai[k]; if(i1 == wakeupSourceFd) { synchronized(interruptLock) { //可以唤醒等待操作的选择操作线程 interruptTriggered = true; } continue; } MapEntry mapentry = fdMap.get(i1); if(mapentry == null) continue; SelectionKeyImpl selectionkeyimpl = mapentry.ski; //是否丢弃i1文件描述的UrgentData if(flag && (selectionkeyimpl.channel() instanceof SocketChannelImpl) && discardUrgentData(i1)) continue; //如果选择key集合selectedKeys中包括selectionkeyimpl,则根据clearedCount和当前清除计数器l //判断是重新设置通道就绪事件,还是直接更新通道就绪事件 if(selectedKeys.contains(selectionkeyimpl)) { if(mapentry.clearedCount != l) { //选择key的清除计数器不为l,则重新设置通道就绪事件 if(selectionkeyimpl.channel.translateAndSetReadyOps(i, selectionkeyimpl) && mapentry.updateCount != l) { //更新计数器自增 mapentry.updateCount = l; j++; } } else //直接更新通道就绪事件 if(selectionkeyimpl.channel.translateAndUpdateReadyOps(i, selectionkeyimpl) && mapentry.updateCount != l) { mapentry.updateCount = l; j++; } mapentry.clearedCount = l; continue; } /如果选择key集合selectedKeys中不包括selectionkeyimpl if(mapentry.clearedCount != l) { //设置通道就绪事件,并添加到selectedKeys集合中,重置更新计数器 selectionkeyimpl.channel.translateAndSetReadyOps(i, selectionkeyimpl); if((selectionkeyimpl.nioReadyOps() & selectionkeyimpl.nioInterestOps()) != 0) { selectedKeys.add(selectionkeyimpl); mapentry.updateCount = l; j++; } } else { //直接更新通道就绪事件 selectionkeyimpl.channel.translateAndUpdateReadyOps(i, selectionkeyimpl); if((selectionkeyimpl.nioReadyOps() & selectionkeyimpl.nioInterestOps()) != 0) { selectedKeys.add(selectionkeyimpl); mapentry.updateCount = l; j++; } } mapentry.clearedCount = l; } return j; } }
private native boolean discardUrgentData(int i);
从上可以看出SubSelector主要有两个方法以poll从pollWrapper拉取关注读写事件的选择key;processSelectedKeys方法主要是更新关注读写事件的选择key的相关通道的已经就绪的操作事件集。
//StartLock
private final class StartLock { private long runsCounter;//选择操作线程计数器 final WindowsSelectorImpl this$0; private StartLock() { this$0 = WindowsSelectorImpl.this; super(); } private synchronized void startThreads() { //启动线程,唤醒所有等待开始的选择操作线程 runsCounter++; notifyAll(); } private synchronized boolean waitForStart(SelectThread selectthread) { while(runsCounter == selectthread.lastRun) try { //选择操作线程等待开始信号 startLock.wait(); } catch(InterruptedException interruptedexception) { Thread.currentThread().interrupt(); } if(selectthread.isZombie()) { //处于等待状态 return true; } else { //选择操作线程正在执行,waitForStart返回else selectthread.lastRun = runsCounter; return false; } } }
StartLock主要控制选择线程,startThreads方法为唤醒所有等待选择操作的线程,
运行计数器runsCounter自增,waitForStart方法为,判断选择线程是否需要等待
开始锁。
//FinishLock
private final class FinishLock { private int threadsToFinish;//选择操作线程计数器 IOException exception; final WindowsSelectorImpl this$0; private FinishLock() { this$0 = WindowsSelectorImpl.this; super(); exception = null; } //重置需要完成选择操作的线程计数器 private void reset() { threadsToFinish = threads.size(); } private synchronized void threadFinished() { if(threadsToFinish == threads.size()) //如果选择操作线程计数器为线程集合的大小,则唤醒等待选择操作的线程 wakeup(); //选择操作线程计数器自减 threadsToFinish--; if(threadsToFinish == 0) //如果选择线程都执行完,则唤醒等待完成锁的线程 notify(); } private synchronized void waitForHelperThreads() { if(threadsToFinish == threads.size()) //唤醒等待选择操作的线程 wakeup(); //如果所有选择操作没完成,则等待所有选择操作完成 while(threadsToFinish != 0) try { //只有在所有选择操作线程都完成后,完成锁才释放。 finishLock.wait(); } catch(InterruptedException interruptedexception) { Thread.currentThread().interrupt(); } } //设置选择线程执行异常 private synchronized void setException(IOException ioexception) { exception = ioexception; } //检查异常,如果选择线程执行异常,将异常包装为IOException抛出 private void checkForException() throws IOException { if(exception == null) { return; } else { StringBuffer stringbuffer = new StringBuffer("An exception occured during the execution of select(): \n"); stringbuffer.append(exception); stringbuffer.append('\n'); exception = null; throw new IOException(stringbuffer.toString()); } } }
FinishLock用于控制线程集合中的选择线程,完成锁只有在所有线程集合中的执行完,
才释放,waitForHelperThreads方法为等待完成锁,threadFinished方法为当前选择线程
已结束,更新完成的选择线程计数器threadsToFinish(减一),reset方法重置threadsToFinish为线程集合大小。
再来看SelectThread
//SelectThread
private final class SelectThread extends Thread { //选择线程索引,即WindowsSelectorImpl中的线程集合对应的索引 //为什么是这样,我们后面会说 private final int index; final SubSelector subSelector; private long lastRun;//已经运行的选择操作线程数 //选择操作线程状态,是否处于等待状态或空闲状态 private volatile boolean zombie; final WindowsSelectorImpl this$0; private SelectThread(int i) { this$0 = WindowsSelectorImpl.this; super(); lastRun = 0L; index = i; subSelector = new SubSelector(i); lastRun = startLock.runsCounter; } //置选择操作状态为空闲等待 void makeZombie() { zombie = true; } boolean isZombie() { return zombie; } public void run() { do { //如果需要等待开始锁,则直接返回 if(startLock.waitForStart(this)) return; try { //否则拉取索引index批次的选择key subSelector.poll(index); } catch(IOException ioexception) { finishLock.setException(ioexception); } //完成,更新完成选择操作线程计数器,自减 finishLock.threadFinished(); } while(true); } }
SelectThread线程启动时等待startLock,从pollWrapper拉取索引index对应的关注读写事件的选择key如果运行异常,则设置finishLock的finishLock,运行结束则更新完成选择操作线程计数器(自减)。
当这里我们已经把StartLock,FinishLock,SelectThread和SubSelector看完,再来看
选择操作中的doSelect(long l)
protected int doSelect(long l) throws IOException { if(channelArray == null) //选择器已关闭 throw new ClosedSelectorException(); timeout = l;//设置超时时间 //反注册已经取消的选key processDeregisterQueue(); if(interruptTriggered) { //已触发中断,则重新设置唤醒通道 resetWakeupSocket(); return 0; } //调整选择线程数量 adjustThreadsCount(); //重置完成锁需要完整的选择线程数量 finishLock.reset(); //启动所有等待选择操作的线程 startLock.startThreads(); begin();//这个方法与end方法配合使用记录在io操作的过程中是否被中断 try { //pollWrapper的起始位置拉取读写选择key subSelector.poll(); } catch(IOException ioexception) { finishLock.setException(ioexception); } //如果选择线程不为空,则等待所有选择线程结束 if(threads.size() > 0) finishLock.waitForHelperThreads(); end(); break MISSING_BLOCK_LABEL_114; Exception exception; exception; end(); throw exception; //检查异常 finishLock.checkForException(); processDeregisterQueue(); //更新已准备就绪的通道操作事件 int i = updateSelectedKeys(); resetWakeupSocket(); return i; }
这个方法中我们有几点要看
3.1
//已触发中断,则重新设置唤醒通道
resetWakeupSocket();
3.2
//调整选择线程数量
adjustThreadsCount();
3.3
//更新已准备就绪的通道操作事件
int i = updateSelectedKeys();
下面分别来看这几点:
3.1
//已触发中断,则重新设置唤醒通道
resetWakeupSocket();
private void resetWakeupSocket() { label0: { synchronized(interruptLock) { if(interruptTriggered) break label0; } return; } //重新设置唤醒通道文件描述符 resetWakeupSocket0(wakeupSourceFd); interruptTriggered = false; obj; JVM INSTR monitorexit ; goto _L1 exception; throw exception; _L1: }
private native void resetWakeupSocket0(int i);
3.2
//调整选择线程数量
adjustThreadsCount();
private void adjustThreadsCount() { if(threadsCount > threads.size()) { //需要的线程数量大于线程集合实际线程数,补充选择操作线程 for(int i = threads.size(); i < threadsCount; i++) { SelectThread selectthread = new SelectThread(i); threads.add(selectthread); selectthread.setDaemon(true); selectthread.start(); } } else if(threadsCount < threads.size()) { //否则从选择线程集合中移除不必要的选择线程,并设置状态为Zombie(空闲) for(int j = threads.size() - 1; j >= threadsCount; j--) ((SelectThread)threads.remove(j)).makeZombie(); } }
3.3
//更新已准备就绪的通道操作事件
int i = updateSelectedKeys();
private int updateSelectedKeys() { updateCount++; int i = 0 i += subSelector.processSelectedKeys(updateCount); //遍历选择线程集,更新子选择线程中已经就绪的通道操作事件 for(Iterator iterator = threads.iterator(); iterator.hasNext();) { SelectThread selectthread = (SelectThread)iterator.next(); //更新通道已经就绪的操作事件 i += selectthread.subSelector.processSelectedKeys(updateCount); } return i; }
doSelect方法将选择操作分成多个选择线程SelectThread放在选择线程放在threads集合中,每个SelectThread使用,SubSelector从当前注册到选择器的通道中选取SubSelector索引所对应的批次的通道已经就绪的通道并更新操作事件。整个选择过程有startLock和finishLock来控制。再有在一个选择操作的所有子选择线程执行完,才释放finishLock。下一个选择操作才能开始,即startLock可用。
4.唤醒方法wakeup
public Selector wakeup() { synchronized(interruptLock) { if(!interruptTriggered) { //设置唤醒sink通道 setWakeupSocket(); interruptTriggered = true; } } return this; } private void setWakeupSocket() { //唤醒所有等待选择操作的线程 setWakeupSocket0(wakeupSinkFd); } private native void setWakeupSocket0(int i);
wakeup主要是通过sink通道发送信息给source通道(native实现),通知子选择线程可以进行选择操作。子选择线程选择主要处理相应批次的1024个通道就绪事件(每批次通道关联到source通道)。
5.实际关闭选择通道方法implClose
protected void implClose() throws IOException { synchronized(closeLock) { if(channelArray != null && pollWrapper != null) { synchronized(interruptLock) { interruptTriggered = true; } //关闭唤醒管道的source和sink通道 wakeupPipe.sink().close(); wakeupPipe.source().close(); for(int i = 1; i < totalChannels; i++) { if(i % 1024 == 0) //1024的整数位置为唤醒source通道,跳过 continue; //反注册通道 deregister(channelArray[i]); SelectableChannel selectablechannel = channelArray[i].channel(); if(!selectablechannel.isOpen() && !selectablechannel.isRegistered()) ((SelChImpl)selectablechannel).kill(); } //释放pollWrapper空间 pollWrapper.free(); pollWrapper = null; selectedKeys = null; channelArray = null; SelectThread selectthread; //结束所有选择线程集合中的线程 for(Iterator iterator = threads.iterator(); iterator.hasNext(); selectthread.makeZombie()) selectthread = (SelectThread)iterator.next(); startLock.startThreads(); } } }
implClose方法主要关闭唤醒管道的sink和source通道,反注册选择器的所有通道,释放所有通道空间,结束所有选择线程集合中的线程。
再回到WindowsSelectorImpl集合,变量声明和构造
final class WindowsSelectorImpl extends SelectorImpl { private final int INIT_CAP = 8;//选择key集合,key包装集合初始化容量 private static final int MAX_SELECTABLE_FDS = 1024;//最大选择key数量 private SelectionKeyImpl channelArray[];//选择器关联通道集合 private PollArrayWrapper pollWrapper;//存放所有文件描述对象(选择key,唤醒管道的源与sink通道)的集合 private int totalChannels;//注册到选择的通道数量 private int threadsCount;//选择线程数 private final List threads = new ArrayList();//选择操作线程集合 private final Pipe wakeupPipe = Pipe.open();//唤醒等待选择操操的管道 private final int wakeupSourceFd;//唤醒管道源通道文件描述 private final int wakeupSinkFd;//唤醒管道sink通道文件描述 private Object closeLock;//选择器关闭同步锁 private final FdMap fdMap = new FdMap();//存放选择key文件描述与选择key映射关系的Map //子选择器,主要从pollWrapper拉取读写选择key,并更新key通道的就绪操作事件集 private final SubSelector subSelector = new SubSelector(); private long timeout;//超时时间,从pollWrapper拉取文件描述的超时时间 private final Object interruptLock = new Object();//中断同步锁,在唤醒选择操作线程时,用于同步 private volatile boolean interruptTriggered;//是否唤醒等待选择操的线程 private final StartLock startLock = new StartLock();//选择操作开始锁 private final FinishLock finishLock = new FinishLock();//选择操作结束锁 private long updateCount;//已选择key,更新就绪操作事件计数器 static final boolean $assertionsDisabled = !sun/nio/ch/WindowsSelectorImpl.desiredAssertionStatus(); static { //加载nio,net资源库 Util.load(); } }
WindowsSelectorImpl(SelectorProvider selectorprovider) throws IOException { super(selectorprovider); //创建选择器关联通道数组,实际存的为选择key channelArray = new SelectionKeyImpl[8]; totalChannels = 1;//通道计数器 threadsCount = 0;//线程计数器 closeLock = new Object();//关闭锁 interruptTriggered = false;//唤醒是否触发 updateCount = 0L;//更新计数器 pollWrapper = new PollArrayWrapper(8); wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal();//唤醒管道源通道文件描述id SinkChannelImpl sinkchannelimpl = (SinkChannelImpl)wakeupPipe.sink();//唤醒管道sink通道 sinkchannelimpl.sc.socket().setTcpNoDelay(true);//设置唤醒管道sink通道的Socket为无延时 wakeupSinkFd = sinkchannelimpl.getFDVal(); //将唤醒管道的源通道文件描述id添加pollWrapper的索引0位置上 pollWrapper.addWakeupSocket(wakeupSourceFd, 0); }
WindowsSelectorImpl默认加载net和nio资源库;WindowsSelectorImpl内锁4个,分别为关闭锁closeLock,中断锁interruptLock,选择操作开始锁startLock,选择操作结束finishLock控制;一个唤醒管道(wakeupSourceFd,wakeupSinkFd),所有阻塞选择操的子选择线程与wakeupSourceFd相关联;唤醒方法主要是通过Sink通道发送消息给source通道,以唤醒所有选择操作线程。注册到选择器的通道计数器totalChannels;updateCount计数器,已选择key集合更新的记录数;通道集合channelArray,存放的元素实际为通道关联的选择key;
总结:
implRegister方法,首先同步关闭锁,以防在注册的过程中,选择器被关闭;检查选择器是否关闭,没有关闭,则检查是否扩容,需要则扩容为pollWrapper为原来的两倍;检查过后,添加选择key到选择器通道集合,设置key在选择器通道集合的索引,添加选择key到文件描述fdMap,添加key到key集合,将选择key添加到文件描述信息及关注操作事件包装集合pollWrapper,通道计数器自增。
implDereg方法首选判断反注册的key是不是在通道key尾部,不在交换,并将交换信息更新到pollWrapper,从fdMap,keys,selectedKeys集合移除选择key,并将key从通道中移除。
SubSelector主要有两个方法以poll从pollWrapper拉取关注读写事件的选择key;
processSelectedKeys方法主要是更新关注读写事件的选择key的相关通道的已经就绪的操作事件集。
StartLock主要控制选择线程,startThreads方法为唤醒所有等待选择操作的线程,运行计数器runsCounter自增,waitForStart方法为,判断选择线程是否需要等待开始锁。
FinishLock用于控制线程集合中的选择线程,完成锁只有在所有线程集合中的执行完,
才释放,waitForHelperThreads方法为等待完成锁,threadFinished方法为当前选择线程
已结束,更新完成的选择线程计数器threadsToFinish(减一),reset方法重置threadsToFinish为线程集合大小。
SelectThread线程启动时等待startLock,从pollWrapper拉取索引index对应的关注读写事件的选择key如果运行异常,则设置finishLock的finishLock,运行结束则更新完成选择操作线程计数器(自减)。
doSelect方法将选择操作分成多个选择线程SelectThread放在选择线程放在threads集合中,每个SelectThread使用SubSelector从当前注册到选择器的通道中选取SubSelector索引所对应的批次的通道已经就绪的通道并更新操作事件。整个选择过程有startLock和finishLock来控制。再有在一个选择操作的所有子选择线程执行完,才释放finishLock。下一个选择操作才能开始,即startLock可用。
wakeup主要是通过sink通道发送信息给source通道(native实现),通知子选择线程可以进行选择操作。子选择线程选择主要处理相应批次的1024个通道就绪事件(每批次通道关联到source通道)。
implClose方法主要关闭唤醒管道的sink和source通道,反注册选择器的所有通道,释放所有通道空间,结束所有选择线程集合中的线程
发表评论
-
文件通道解析二(文件锁,关闭通道)
2017-05-16 23:17 1069文件通道解析一(读写操作,通道数据传输等):http://do ... -
文件通道解析一(读写操作,通道数据传输等)
2017-05-16 10:04 1165Reference定义(PhantomRefere ... -
文件通道创建方式综述
2017-05-15 17:39 1070Reference定义(PhantomReference,Cl ... -
文件读写方式简单综述后续(文件,流构造)
2017-05-14 23:04 1486Java Socket通信实例:http://donald-d ... -
文件读写方式简单综述
2017-05-14 11:13 1136Java Socket通信实例:http://donald-d ... -
FileChanne定义
2017-05-12 23:28 942文件读写方式简单综述:http://donald-draper ... -
SeekableByteChannel接口定义
2017-05-11 08:43 1237ByteChannel,分散聚集通道接口的定义(SocketC ... -
FileChannel示例
2017-05-11 08:37 994前面我们看过socket通道,datagram通道,以管道Pi ... -
PipeImpl解析
2017-05-11 08:41 936ServerSocketChannel定义:http://do ... -
Pipe定义
2017-05-10 09:07 911Channel接口定义:http://donald-drape ... -
NIO-Pipe示例
2017-05-10 08:47 907PipeImpl解析:http://donald-draper ... -
DatagramChannelImpl 解析四(地址绑定,关闭通道等)
2017-05-10 08:27 782DatagramChannelImpl 解析一(初始化):ht ... -
DatagramChannelImpl 解析三(多播)
2017-05-10 08:20 1910DatagramChannelImpl 解析一(初始化):ht ... -
NIO-UDP实例
2017-05-09 12:32 1587DatagramChannelImpl 解析一(初始化):ht ... -
DatagramChannelImpl 解析二(报文发送与接收)
2017-05-09 09:03 1412DatagramChannelImpl 解析一(初始化):ht ... -
DatagramChannelImpl 解析一(初始化)
2017-05-08 21:52 1412Channel接口定义:http://donald-drape ... -
MembershipKeyImpl 简介
2017-05-08 09:11 927MembershipKey定义:http://donald-d ... -
DatagramChannel定义
2017-05-07 23:13 1230Channel接口定义:http://donald-drape ... -
MulticastChanne接口定义
2017-05-07 13:45 1140NetworkChannel接口定义:ht ... -
MembershipKey定义
2017-05-06 16:20 918package java.nio.channels; i ...
相关推荐
资源名称:Java-NIO-Netty框架学习资源目录:【】Netty5.0架构剖析和源码解读【】Netty5用户指南【】Netty_in_Action(第五版-目录修正版)【】Netty_in_Action_v08_MEAP【】Netty_in_Action_v10_MEAP【】Netty_代码...
### Java NIO 的精彩细节解析 #### 一、Selector的Wakeup原理 ##### 1.1 背景介绍 在Java NIO (Non-blocking I/O)中,`Selector` 是核心组件之一,用于监控多个`Channel`上的I/O事件(如可读、可写等)。`...
文件放服务器下载,请务必到电脑端资源详情查看然后下载
文件太大放服务器下载,请务必到电脑端资源详情查看然后下载
文件放服务器下载,请务必到电脑端资源详情查看然后下载
文件太大放服务器下载,请务必到电脑端资源详情查看然后下载
文件太大放服务器下载,请务必到电脑端资源详情查看然后下载
文件太大放服务器下载,请务必到电脑端资源详情查看然后下载
文件放服务器下载,请务必到电脑端资源详情查看然后下载
文件太大放服务器下载,请务必到电脑端资源详情查看然后下载
文件太大放服务器下载,请务必到电脑端资源详情查看然后下载
文件太大放服务器下载,请务必到电脑端资源详情查看然后下载
文件太大放服务器下载,请务必到电脑端资源详情查看然后下载
文件太大放服务器下载,请务必到电脑端资源详情查看然后下载
文件太大放服务器下载,请务必到电脑端资源详情查看然后下载
文件太大放服务器下载,请务必到电脑端资源详情查看然后下载
java java线程小游戏,大鱼吃小鱼,实现了大体的模式,可以给想做小游戏的朋友一点参考.zip
施工人员检测54-YOLO(v5至v9)、COCO、CreateML、Darknet、Paligemma数据集合集.rarPPE_KIT 3-V4 2024-07-25 9:31 AM ============================= *与您的团队在计算机视觉项目上合作 *收集和组织图像 *了解和搜索非结构化图像数据 *注释,创建数据集 *导出,训练和部署计算机视觉模型 *使用主动学习随着时间的推移改善数据集 对于最先进的计算机视觉培训笔记本,您可以与此数据集一起使用 该数据集包括17334张图像。 PPE_KIT-3WE9-WQOV-IEQN-OGMT以可可格式注释。 将以下预处理应用于每个图像: *像素数据的自动取向(带有Exif-Arientation剥离) *调整大小为640x640(拉伸) 应用以下扩展来创建每个源图像的3个版本: * -15%至+15%之间的随机BRIGTHNESS调整 * 0到1.1像素之间的随机高斯模糊
文件放服务器下载,请务必到电脑端资源详情查看然后下载
文件太大放服务器下载,请务必到电脑端资源详情查看然后下载