浏览 9725 次
精华帖 (0) :: 良好帖 (2) :: 新手帖 (0) :: 隐藏帖 (0)
|
|
---|---|
作者 | 正文 |
发表时间:2013-04-09
最后修改:2013-04-09
出处:http://gearever.iteye.com
上一篇简单记录了缺省配置的connector的内部构造及消息流,同时此connector也是基于BIO的实现。除了BIO外,也可以通过配置快速部署NIO的connector。在server.xml中如下配置; <Connector port="80" URIEncoding="UTF-8" protocol="org.apache.coyote.http11.Http11NioProtocol" connectionTimeout="20000" redirectPort="7443" /> 整个tomcat是一个比较完善的框架体系,各个组件之间都是基于接口的实现,所以比较方便扩展和替换。像这里的“org.apache.coyote.http11.Http11NioProtocol”和BIO的“org.apache.coyote.http11.Http11Protocol”都是统一的实现org.apache.coyote.ProtocolHandler接口,所以从整体结构上来说,NIO还是与BIO的实现保持大体一致。 首先来看一下NIO connector的内部结构,箭头方向还是消息流; 还是可以看见connector中三大块
基本功能与BIO的类似,参见tomcat架构分析(connector BIO实现)。重点看看Http11NioProtocol. 和JIoEndpoint一样,NioEndpoint是Http11NioProtocol中负责接收处理socket的主要模块。但是在结构上比JIoEndpoint要复杂一些,毕竟是非阻塞的。但是需要注意的是,tomcat的NIO connector并非完全是非阻塞的,有的部分,例如接收socket,从socket中读写数据等,还是阻塞模式实现的,在后面会逐一介绍。 如图所示,NioEndpoint的主要流程; 图中Acceptor及Worker分别是以线程池形式存在,Poller是一个单线程。注意,与BIO的实现一样,缺省状态下,在server.xml中没有配置<Executor>,则以Worker线程池运行,如果配置了<Executor>,则以基于java concurrent 系列的java.util.concurrent.ThreadPoolExecutor线程池运行。 Acceptor 接收socket线程,这里虽然是基于NIO的connector,但是在接收socket方面还是传统的serverSocket.accept()方式,获得SocketChannel对象,然后封装在一个tomcat的实现类org.apache.tomcat.util.net.NioChannel对象中。然后将NioChannel对象封装在一个PollerEvent对象中,并将PollerEvent对象压入events queue里。这里是个典型的生产者-消费者模式,Acceptor与Poller线程之间通过queue通信,Acceptor是events queue的生产者,Poller是events queue的消费者。 Poller Poller线程中维护了一个Selector对象,NIO就是基于Selector来完成逻辑的。在connector中并不止一个Selector,在socket的读写数据时,为了控制timeout也有一个Selector,在后面的BlockSelector中介绍。可以先把Poller线程中维护的这个Selector标为主Selector。 Poller是NIO实现的主要线程。首先作为events queue的消费者,从queue中取出PollerEvent对象,然后将此对象中的channel以OP_READ事件注册到主Selector中,然后主Selector执行select操作,遍历出可以读数据的socket,并从Worker线程池中拿到可用的Worker线程,然后将socket传递给Worker。整个过程是典型的NIO实现。 Worker Worker线程拿到Poller传过来的socket后,将socket封装在SocketProcessor对象中。然后从Http11ConnectionHandler中取出Http11NioProcessor对象,从Http11NioProcessor中调用CoyoteAdapter的逻辑,跟BIO实现一样。在Worker线程中,会完成从socket中读取http request,解析成HttpServletRequest对象,分派到相应的servlet并完成逻辑,然后将response通过socket发回client。在从socket中读数据和往socket中写数据的过程,并没有像典型的非阻塞的NIO的那样,注册OP_READ或OP_WRITE事件到主Selector,而是直接通过socket完成读写,这时是阻塞完成的,但是在timeout控制上,使用了NIO的Selector机制,但是这个Selector并不是Poller线程维护的主Selector,而是BlockPoller线程中维护的Selector,称之为辅Selector。 NioSelectorPool NioEndpoint对象中维护了一个NioSelecPool对象,这个NioSelectorPool中又维护了一个BlockPoller线程,这个线程就是基于辅Selector进行NIO的逻辑。以执行servlet后,得到response,往socket中写数据为例,最终写的过程调用NioBlockingSelector的write方法。 public int write(ByteBuffer buf, NioChannel socket, long writeTimeout,MutableInteger lastWrite) throws IOException { SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector()); if ( key == null ) throw new IOException("Key no longer registered"); KeyAttachment att = (KeyAttachment) key.attachment(); int written = 0; boolean timedout = false; int keycount = 1; //assume we can write long time = System.currentTimeMillis(); //start the timeout timer try { while ( (!timedout) && buf.hasRemaining()) { if (keycount > 0) { //only write if we were registered for a write //直接往socket中写数据 int cnt = socket.write(buf); //write the data lastWrite.set(cnt); if (cnt == -1) throw new EOFException(); written += cnt; //写数据成功,直接进入下一次循环,继续写 if (cnt > 0) { time = System.currentTimeMillis(); //reset our timeout timer continue; //we successfully wrote, try again without a selector } } //如果写数据返回值cnt等于0,通常是网络不稳定造成的写数据失败 try { //开始一个倒数计数器 if ( att.getWriteLatch()==null || att.getWriteLatch().getCount()==0) att.startWriteLatch(1); //将socket注册到辅Selector,这里poller就是BlockSelector线程 poller.add(att,SelectionKey.OP_WRITE); //阻塞,直至超时时间唤醒,或者在还没有达到超时时间,在BlockSelector中唤醒 att.awaitWriteLatch(writeTimeout,TimeUnit.MILLISECONDS); }catch (InterruptedException ignore) { Thread.interrupted(); } if ( att.getWriteLatch()!=null && att.getWriteLatch().getCount()> 0) { keycount = 0; }else { //还没超时就唤醒,说明网络状态恢复,继续下一次循环,完成写socket keycount = 1; att.resetWriteLatch(); } if (writeTimeout > 0 && (keycount == 0)) timedout = (System.currentTimeMillis() - time) >= writeTimeout; } //while if (timedout) throw new SocketTimeoutException(); } finally { poller.remove(att,SelectionKey.OP_WRITE); if (timedout && key != null) { poller.cancelKey(socket, key); } } return written; } 也就是说当socket.write()返回0时,说明网络状态不稳定,这时将socket注册OP_WRITE事件到辅Selector,由BlockPoller线程不断轮询这个辅Selector,直到发现这个socket的写状态恢复了,通过那个倒数计数器,通知Worker线程继续写socket动作。看一下BlockSelector线程的逻辑; public void run() { while (run) { try { ...... Iterator iterator = keyCount > 0 ? selector.selectedKeys().iterator() : null; while (run && iterator != null && iterator.hasNext()) { SelectionKey sk = (SelectionKey) iterator.next(); KeyAttachment attachment = (KeyAttachment)sk.attachment(); try { attachment.access(); iterator.remove(); ; sk.interestOps(sk.interestOps() & (~sk.readyOps())); if ( sk.isReadable() ) { countDown(attachment.getReadLatch()); } //发现socket可写状态恢复,将倒数计数器置位,通知Worker线程继续 if (sk.isWritable()) { countDown(attachment.getWriteLatch()); } }catch (CancelledKeyException ckx) { if (sk!=null) sk.cancel(); countDown(attachment.getReadLatch()); countDown(attachment.getWriteLatch()); } }//while }catch ( Throwable t ) { log.error("",t); } } events.clear(); try { selector.selectNow();//cancel all remaining keys }catch( Exception ignore ) { if (log.isDebugEnabled())log.debug("",ignore); } } 使用这个辅Selector主要是减少线程间的切换,同时还可减轻主Selector的负担。以上描述了NIO connector工作的主要逻辑,可以看到在设计上还是比较精巧的。NIO connector还有一块就是Comet,有时间再说吧。需要注意的是,上面从Acceptor开始,有很多对象的封装,NioChannel及其KeyAttachment,PollerEvent和SocketProcessor对象,这些不是每次都重新生成一个新的,都是NioEndpoint分别维护了它们的对象池; ConcurrentLinkedQueue<SocketProcessor> processorCache = new ConcurrentLinkedQueue<SocketProcessor>() ConcurrentLinkedQueue<KeyAttachment> keyCache = new ConcurrentLinkedQueue<KeyAttachment>() ConcurrentLinkedQueue<PollerEvent> eventCache = new ConcurrentLinkedQueue<PollerEvent>() ConcurrentLinkedQueue<NioChannel> nioChannels = new ConcurrentLinkedQueue<NioChannel>() 当需要这些对象时,分别从它们的对象池获取,当用完后返回给相应的对象池,这样可以减少因为创建及GC对象时的性能消耗。 声明:ITeye文章版权属于作者,受法律保护。没有作者书面许可不得转载。
推荐链接
|
|
返回顶楼 | |
发表时间:2013-04-09
讨论一下 图中Acceptor及Worker分别是以线程池形式存在,Poller是一个单线程。
Acceptor是单个线程吧我在tomcat7.0.35版本中看到 // Initialize thread count defaults for acceptor, poller if (acceptorThreadCount == 0) { // FIXME: Doesn't seem to work that well with multiple accept threads acceptorThreadCount = 1; } 虽然acceptor线程能设置多个 但是我没有想到设置多个的原因。 Poller线程不是单个吧如果nio的底层实现用的select,单个poller效率估计会有问题。 |
|
返回顶楼 | |
发表时间:2013-04-09
xiaoZ5919 写道 讨论一下 图中Acceptor及Worker分别是以线程池形式存在,Poller是一个单线程。
Acceptor是单个线程吧我在tomcat7.0.35版本中看到 // Initialize thread count defaults for acceptor, poller if (acceptorThreadCount == 0) { // FIXME: Doesn't seem to work that well with multiple accept threads acceptorThreadCount = 1; } 虽然acceptor线程能设置多个 但是我没有想到设置多个的原因。 Poller线程不是单个吧如果nio的底层实现用的select,单个poller效率估计会有问题。 1.缺省状态下确实有acceptorThreadCount = 1;的设置,但是起Acceptor时还是这种方式 for (int i = 0; i < acceptorThreadCount; i++) { Thread acceptorThread = new Thread(new Acceptor(), getName() + "-Acceptor-" + i); acceptorThread.setPriority(threadPriority); acceptorThread.setDaemon(daemon); acceptorThread.start(); } 而且有个public 的setAcceptorThreadCount接口,所以我说以线程池形式存在; 2.Poller线程是单线程,当它发现有可读的socket时,会获取Worker线程(如果没有executor的话)来处理这个socket的读写,它自己不执行具体的逻辑; 3.以上是基于6.0.20的 |
|
返回顶楼 | |
发表时间:2013-04-09
Poller也是可以设置为多个,我说错了我的版本是7.0.25其中对pollerThreadCount做了一个优化
protected int pollerThreadCount = Runtime.getRuntime().availableProcessors(); |
|
返回顶楼 | |
发表时间:2013-04-11
tomcat 源码分析
|
|
返回顶楼 | |
发表时间:2013-04-11
acceptor一个够了,多了怕引起惊群问题。
不过不明为什么accptor不用selector方式?? |
|
返回顶楼 | |
发表时间:2013-04-12
是啊 这大概也是tomcat和nginx架构的区别,想nginx每个worker(Poller)即负责accpet又负责IO读写,抢占式accept会使每个worker的负载更加均衡,但同时也会带来惊群的问题。当worker线程被阻塞时会影响accept。Tomcat的架构则采用单独的accept,然后根据一定的负载策略分到Poller上。
|
|
返回顶楼 | |