`
zddava
  • 浏览: 243627 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Tomcat NIO源代码分析(一) -- Acceptor

阅读更多
这里主要讲一下Tomcat使用NIO启动和进行请求处理的大致流程,使用的源码版本是7.0.5,对于其他处理等流程就不写了,我在别的文章里已经大致写过了,不过是用的6.0版本:http://zddava.iteye.com/category/53603

当Tomcat配置成使用NIO时,启动过程其实和过去差不多,也是Connector#startInternal -> Protocol(Http11NioProtocol)#start() -> Endpoint(NioEndPoint)#start()的过程,这里主要看一下NioEndPoint:

	public void start() throws Exception {
		// 初始化
		if (!initialized) {
			init();
		}
		if (!running) {
			running = true;
			paused = false;

			// 创建一个ThreadPoolExecutor对象,和JDK里的功能一样,只不过进行了一些扩展
			if (getExecutor() == null) {
				createExecutor();
			}

			// 开启poll的线程
			pollers = new Poller[getPollerThreadCount()];
			for (int i = 0; i < pollers.length; i++) {
				pollers[i] = new Poller();
				Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-" + i);
				pollerThread.setPriority(threadPriority);
				pollerThread.setDaemon(true);
				pollerThread.start();
			}

			// 开启Acceptor的线程
			for (int i = 0; i < acceptorThreadCount; i++) {
				Thread acceptorThread = new Thread(new Acceptor(), getName() + "-Acceptor-" + i);
				acceptorThread.setPriority(threadPriority);
				acceptorThread.setDaemon(getDaemon());
				acceptorThread.start();
			}
		}
	}


这里先看一下init()方法,没有全列出来,最主要的一点就是初始化ServerSocketChannel:

	public void init() throws Exception {

		if (initialized)
			return;

		// 初始化ServerSocketChannel,这里用的是阻塞的方式,没有用Selector
		serverSock = ServerSocketChannel.open();
		socketProperties.setProperties(serverSock.socket());
		InetSocketAddress addr = (getAddress() != null ? new InetSocketAddress(getAddress(), getPort())
				: new InetSocketAddress(getPort()));
		serverSock.socket().bind(addr, getBacklog());
		serverSock.configureBlocking(true); // mimic APR behavior
		serverSock.socket().setSoTimeout(getSocketProperties().getSoTimeout());

		......

	}


Tomcat每种Endpoint的Acceptor线程其实作用都一样,对来访的请求进行最初的处理之用,NioEndpoint的Acceptor也不例外,它内部也只定义一个继承自Runnable的方法:

        public void run() {
            while (running) {
                
                while (paused && running) {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        // Ignore
                    }
                }

                if (!running) {
                    break;
                }
                try {
		    // 接受请求
                    SocketChannel socket = serverSock.accept();
                    if ( running && (!paused) && socket != null ) {
                    	// 将SocketChannel给pollor处理
                        if (!setSocketOptions(socket)) {
                            try {
                                socket.socket().close();
                                socket.close();
                            } catch (IOException ix) {
                                if (log.isDebugEnabled())
                                    log.debug("", ix);
                            }
                        } 
                    }
                } catch (SocketTimeoutException sx) {
                    //normal condition
                } catch (IOException x) {
                    if (running) {
                        log.error(sm.getString("endpoint.accept.fail"), x);
                    }
                } catch (OutOfMemoryError oom) {
                    try {
                        oomParachuteData = null;
                        releaseCaches();
                        log.error("", oom);
                    }catch ( Throwable oomt ) {
                        try {
                            try {
                                System.err.println(oomParachuteMsg);
                                oomt.printStackTrace();
                            }catch (Throwable letsHopeWeDontGetHere){
                                ExceptionUtils.handleThrowable(letsHopeWeDontGetHere);
                            }
                        }catch (Throwable letsHopeWeDontGetHere){
                            ExceptionUtils.handleThrowable(letsHopeWeDontGetHere);
                        }
                    }
                } catch (Throwable t) {
                    ExceptionUtils.handleThrowable(t);
                    log.error(sm.getString("endpoint.accept.fail"), t);
                }
            }
        }
    }


方法其实挺容易理解,就是得到请求用的SocketChannel后交给Poller处理,这里poll是一个UNIX的系统调用名字,Java开发者可以google下,我也是才准备开始啃《UNIX网络编程》,闲言少叙,看一下#setSocketOptions()方法吧:

	protected boolean setSocketOptions(SocketChannel socket) {
		// Process the connection
		try {
			// disable blocking, APR style, we are gonna be polling it
			// 这里终于看到了印象中的NIO的影子了
			socket.configureBlocking(false);
			Socket sock = socket.socket();
			socketProperties.setProperties(sock);

			// NioChannel是ByteChannel的子类
			// 从队列里取出第一个可用的Channel,这样的话NioChannel应该是设计成非GC的
			// 感觉其目的主要是对SocketChannel进行下封装
			NioChannel channel = nioChannels.poll();
			if (channel == null) {
				// 不过这里如果没有可用的就初始化一个的话请求数陡然增高再慢慢回落的时候不就浪费了内存了吗?
				// NioBufferHandler里分别分配了读缓冲区和写缓冲区
				// SSL setup
				if (sslContext != null) {
					SSLEngine engine = createSSLEngine();
					int appbufsize = engine.getSession().getApplicationBufferSize();
					NioBufferHandler bufhandler = new NioBufferHandler(Math.max(appbufsize,
							socketProperties.getAppReadBufSize()), Math.max(appbufsize,
							socketProperties.getAppWriteBufSize()), socketProperties.getDirectBuffer());
					channel = new SecureNioChannel(socket, engine, bufhandler, selectorPool);
				} else {
					// normal tcp setup
					NioBufferHandler bufhandler = new NioBufferHandler(socketProperties.getAppReadBufSize(),
							socketProperties.getAppWriteBufSize(), socketProperties.getDirectBuffer());

					channel = new NioChannel(socket, bufhandler);
				}
			} else {
				// 这里就是对Channel的重用了
				channel.setIOChannel(socket);
				if (channel instanceof SecureNioChannel) {
					SSLEngine engine = createSSLEngine();
					((SecureNioChannel) channel).reset(engine);
				} else {
					channel.reset();
				}
			}
			// 这里就是将SocketChannel注册到Poller了。
			// getPoller0用的循环的方式来返回Poller,即Poller 1, 2, 3... n 然后再回到1, 2, 3....
			getPoller0().register(channel);
		} catch (Throwable t) {
			ExceptionUtils.handleThrowable(t);
			try {
				log.error("", t);
			} catch (Throwable tt) {
				ExceptionUtils.handleThrowable(t);
			}
			// Tell to close the socket
			return false;
		}
		return true;
	}


好了,终于到了Poller了,下一篇开始Poller。
3
0
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics