`
zddava
  • 浏览: 243624 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Tomcat NIO源代码分析(二) -- Poller

阅读更多
接着上面的流程,现在请求到了Poller的#register()方法

	public void register(final NioChannel socket) {
		socket.setPoller(this);
		// KeyAttachment是对NioChannel信息的包装,同样是非GC
		KeyAttachment key = keyCache.poll();
		final KeyAttachment ka = key != null ? key : new KeyAttachment(socket);
		ka.reset(this, socket, getSocketProperties().getSoTimeout());
		ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
		
		// PollerEvent的初始化,非GC Again
		PollerEvent r = eventCache.poll();
		// this is what OP_REGISTER turns into.
		// 读取数据的事件
		ka.interestOps(SelectionKey.OP_READ);
		if (r == null)
			r = new PollerEvent(socket, ka, OP_REGISTER);
		else
			r.reset(socket, ka, OP_REGISTER);
		
		// 把事件加到Poller
		addEvent(r);
	}

	public void addEvent(Runnable event) {
		// 把事件加入到队列中
		events.offer(event);
		// ++wakeupCounter
		if (wakeupCounter.incrementAndGet() == 0) selector.wakeup();
	}


其实也挺好懂的,就是把NioChannel作为OP_REGISTER事件注册到Poller,这样在Poller的#run()方法中就可以对加入Poller的事件进行处理了

	public void run() {
		while (running) {
			try {
				while (paused && (!close)) {
					try {
						Thread.sleep(100);
					} catch (InterruptedException e) {
						// Ignore
					}
				}
				boolean hasEvents = false;

				hasEvents = (hasEvents | events());
				// Time to terminate?
				if (close) {
					timeout(0, false);
					break;
				}
				try {
					if (!close) {
						if (wakeupCounter.get() > 0) {
							// 立刻返回 I/O 就绪的那些通道的键
							keyCount = selector.selectNow();
						} else {
							keyCount = selector.keys().size();
							// 这里把wakeupCounter设成-1,在addEvent的时候就会唤醒selector
							wakeupCounter.set(-1);
							// 使用阻塞的方式
							keyCount = selector.select(selectorTimeout);
						}
						wakeupCounter.set(0);
					}
					if (close) {
						timeout(0, false);
						selector.close();
						break;
					}
				} catch (NullPointerException x) {
					// sun bug 5076772 on windows JDK 1.5
					if (log.isDebugEnabled())
						log.debug("Possibly encountered sun bug 5076772 on windows JDK 1.5", x);
					if (wakeupCounter == null || selector == null)
						throw x;
					continue;
				} catch (CancelledKeyException x) {
					// sun bug 5076772 on windows JDK 1.5
					if (log.isDebugEnabled())
						log.debug("Possibly encountered sun bug 5076772 on windows JDK 1.5", x);
					if (wakeupCounter == null || selector == null)
						throw x;
					continue;
				} catch (Throwable x) {
					ExceptionUtils.handleThrowable(x);
					log.error("", x);
					continue;
				}
				// either we timed out or we woke up, process events first
				if (keyCount == 0)
					hasEvents = (hasEvents | events());

				Iterator<SelectionKey> iterator = keyCount > 0 ? selector.selectedKeys().iterator()
						: null;
				// Walk through the collection of ready keys and dispatch
				// any active event.
				while (iterator != null && iterator.hasNext()) {
					SelectionKey sk = iterator.next();
					// 这里的KeyAttachment实在#register()方法中注册的
					KeyAttachment attachment = (KeyAttachment) sk.attachment();
					attachment.access();
					iterator.remove();
					// 继续流程
					processKey(sk, attachment);
				}// while

				// process timeouts
				timeout(keyCount, hasEvents);
				if (oomParachute > 0 && oomParachuteData == null)
					checkParachute();
			} catch (OutOfMemoryError oom) {
				try {
					oomParachuteData = null;
					releaseCaches();
					log.error("", oom);
				} catch (Throwable oomt) {
					try {
						System.err.println(oomParachuteMsg);
						oomt.printStackTrace();
					} catch (Throwable letsHopeWeDontGetHere) {
						ExceptionUtils.handleThrowable(letsHopeWeDontGetHere);
					}
				}
			}
		}// while
		synchronized (this) {
			this.notifyAll();
		}
		stopLatch.countDown();

	}


这个方法有2个方法需要关注一下:#events()和#processKey():

	public boolean events() {
		boolean result = false;
		// synchronized (events) {
		Runnable r = null;
		// 返回是事件队列中是否有事件
		result = (events.size() > 0);
		while ((r = events.poll()) != null) {
			try {
				// 执行KeyEvent的#run()
				r.run();
				if (r instanceof PollerEvent) {
					((PollerEvent) r).reset();
					// 对KeyEvent进行回收
					eventCache.offer((PollerEvent) r);
				}
			} catch (Throwable x) {
				log.error("", x);
			}
		}
		// events.clear();
		// }
		return result;
	}


这里执行了SocketChannel对应的KeyEvent的#run()方法,在这个方法里给SocketChannel注册了OP_READ:

	public void run() {
		if (interestOps == OP_REGISTER) {
			try {
				// 给SocketChannel注册OP_READ
				socket.getIOChannel().register(socket.getPoller().getSelector(), SelectionKey.OP_READ,
						key);
			} catch (Exception x) {
				log.error("", x);
			}
		} else {
			// 这里应该是对comet进行支持的,暂时先不看

			......

		}// end if
	}// run


第二个是#processKey()方法,里边的很多流程我现在不是很关心,都略去了,

	protected boolean processKey(SelectionKey sk, KeyAttachment attachment) {
		boolean result = true;
		try {
			if (close) {
				cancelledKey(sk, SocketStatus.STOP, false);
			} else if (sk.isValid() && attachment != null) {
				attachment.access();// make sure we don't time out valid sockets
				sk.attach(attachment);// cant remember why this is here
				NioChannel channel = attachment.getChannel();
				if (sk.isReadable() || sk.isWritable()) {
					if (attachment.getSendfileData() != null) {
						processSendfile(sk, attachment, true, false);
					} else if (attachment.getComet()) {// 这里应该是对comet的支持
						......
					} else {
						// 这个分支是现在比较关心的
						if (isWorkerAvailable()) {// 这个好像还没实现
							// 这个#unreg()很巧妙,防止了通道对同一个事件不断select的问题
							unreg(sk, attachment, sk.readyOps());
							boolean close = (!processSocket(channel, null, true));
							if (close) {
								cancelledKey(sk, SocketStatus.DISCONNECT, false);
							}
						} else {
							result = false;
						}
					}
				}
			} else {
				// invalid key
				cancelledKey(sk, SocketStatus.ERROR, false);
			}
		} catch (CancelledKeyException ckx) {
			cancelledKey(sk, SocketStatus.ERROR, false);
		} catch (Throwable t) {
			ExceptionUtils.handleThrowable(t);
			log.error("", t);
		}
		return result;
	}

	protected void unreg(SelectionKey sk, KeyAttachment attachment, int readyOps) {
		reg(sk, attachment, sk.interestOps() & (~readyOps));
	}

	protected void reg(SelectionKey sk, KeyAttachment attachment, int intops) {
		sk.interestOps(intops);
		attachment.interestOps(intops);
		attachment.setCometOps(intops);
	}


这里的#unreg()方法据我理解应该很巧妙的解决了重复的IO事件问题,我自己写的测试用的NIO代码里就会有这个问题。

这样,流程就来到了Poller最后的#processSocket()方法了:

	public boolean processSocket(NioChannel socket, SocketStatus status, boolean dispatch) {
		try {
			KeyAttachment attachment = (KeyAttachment) socket.getAttachment(false);
			attachment.setCometNotify(false); // will get reset upon next reg
			// 使用SocketProcessor
			SocketProcessor sc = processorCache.poll();
			if (sc == null)
				sc = new SocketProcessor(socket, status);
			else
				sc.reset(socket, status);
			if (dispatch && getExecutor() != null)// 如果配置了ThreadPoolExecutor,那么使用它来执行
				getExecutor().execute(sc);
			else
				sc.run();
		} catch (RejectedExecutionException rx) {
			log.warn("Socket processing request was rejected for:" + socket, rx);
			return false;
		} catch (Throwable t) {
			ExceptionUtils.handleThrowable(t);
			// This means we got an OOM or similar creating a thread, or that
			// the pool and its queue are full
			log.error(sm.getString("endpoint.process.fail"), t);
			return false;
		}
		return true;
	}


这里SocketProcessor的#run()方法就不列出了,里边最后会通过下面的语句将流程转到Http11NioProtocol类,其中的handler就是对Http11NioProtocol的引用:

        SocketState state = SocketState.OPEN;
        state = (status==null)?handler.process(socket):handler.event(socket,status);


最后,对Acceptor和Poller的处理过程做个小结,见下图:


  • 大小: 27.2 KB
1
0
分享到:
评论

相关推荐

    xnio-nio-3.8.4.Final-API文档-中英对照版.zip

    赠送源代码:xnio-nio-3.8.4.Final-sources.jar; 赠送Maven依赖信息文件:xnio-nio-3.8.4.Final.pom; 包含翻译后的API文档:xnio-nio-3.8.4.Final-javadoc-API文档-中文(简体)-英语-对照版.zip; Maven坐标:org....

    xnio-nio-3.8.0.Final-API文档-中文版.zip

    赠送源代码:xnio-nio-3.8.0.Final-sources.jar; 赠送Maven依赖信息文件:xnio-nio-3.8.0.Final.pom; 包含翻译后的API文档:xnio-nio-3.8.0.Final-javadoc-API文档-中文(简体)版.zip; Maven坐标:org.jboss.xnio:...

    xnio-nio-3.8.4.Final-API文档-中文版.zip

    赠送源代码:xnio-nio-3.8.4.Final-sources.jar; 赠送Maven依赖信息文件:xnio-nio-3.8.4.Final.pom; 包含翻译后的API文档:xnio-nio-3.8.4.Final-javadoc-API文档-中文(简体)版.zip; Maven坐标:org.jboss.xnio:...

    xnio-nio-3.8.0.Final-API文档-中英对照版.zip

    赠送源代码:xnio-nio-3.8.0.Final-sources.jar; 赠送Maven依赖信息文件:xnio-nio-3.8.0.Final.pom; 包含翻译后的API文档:xnio-nio-3.8.0.Final-javadoc-API文档-中文(简体)-英语-对照版.zip; Maven坐标:org....

    httpcore-nio-4.4.15-API文档-中文版.zip

    赠送源代码:httpcore-nio-4.4.15-sources.jar 包含翻译后的API文档:httpcore-nio-4.4.15-javadoc-API文档-中文(简体)版.zip 对应Maven信息:groupId:org.apache.httpcomponents,artifactId:httpcore-nio,...

    apache-tomcat-8.5.64-windows-x64.zip

    8. **性能优化**:可以通过调整`server.xml`中的线程池设置、启用NIO连接器、增加JVM堆大小等方式优化Tomcat性能。 9. **集成其他应用服务器**:Tomcat可以与其他Java EE服务器(如JBoss、WebLogic)一起使用,作为...

    apache-tomcat-8.5.97-windows-x.zip

    Apache Tomcat 是一个开源软件应用服务器,主要用于部署和运行Java Servlet和JavaServer Pages(JSP)应用程序。这个压缩包文件 "apache-tomcat-8.5.97-windows-x.zip" 包含了Apache Tomcat 8.5.97 版本在Windows...

    Tomcat8源代码

    **Apache Tomcat 8源代码解析** Apache Tomcat是一款开源的Java Servlet容器,它实现了Java Servlet和JavaServer Pages(JSP)规范,是许多Web应用开发者的重要工具。Tomcat 8是其发展的一个重要版本,引入了许多新...

    apache-tomcat-9.0.21-windows-x64.zip

    3. **NIO.2和APR**:Tomcat 9.0.21包含了对Java NIO.2的全面支持,以及可选的Apache Portable Runtime (APR)库,APR利用操作系统级别的特性,如sendfile和epoll,以提高性能和可伸缩性。 4. **JSP 2.3和EL 3.0**:...

    httpcore-nio-4.4.10-API文档-中英对照版.zip

    赠送源代码:httpcore-nio-4.4.10-sources.jar; 赠送Maven依赖信息文件:httpcore-nio-4.4.10.pom; 包含翻译后的API文档:httpcore-nio-4.4.10-javadoc-API文档-中文(简体)-英语-对照版.zip; Maven坐标:org....

    tomcat6 源代码

    这个源代码压缩包提供了Tomcat6的完整源码,对于开发者来说,深入理解其内部工作原理、优化性能或者定制功能都具有极大的价值。下面将详细介绍Tomcat6的一些关键知识点。 1. **Servlet容器**: Tomcat作为一个...

    apache-tomcat-7.0.88-src.tar

    在本文中,我们将深入探讨`apache-tomcat-7.0.88-src.tar`这个源代码包,了解其核心组件、工作原理以及如何通过源代码学习和定制Tomcat。 首先,`apache-tomcat-7.0.88-src.tar`是一个包含Apache Tomcat 7.0.88版本...

    httpcore-nio-4.4.6-API文档-中文版.zip

    赠送源代码:httpcore-nio-4.4.6-sources.jar 包含翻译后的API文档:httpcore-nio-4.4.6-javadoc-API文档-中文(简体)版.zip 对应Maven信息:groupId:org.apache.httpcomponents,artifactId:httpcore-nio,...

    httpcore-nio-4.4.15-API文档-中英对照版.zip

    赠送源代码:httpcore-nio-4.4.15-sources.jar 包含翻译后的API文档:httpcore-nio-4.4.15-javadoc-API文档-中文(简体)-英语-对照版.zip 对应Maven信息:groupId:org.apache.httpcomponents,artifactId:...

    最新版windows apache-tomcat-9.0.54-windows-x64.zip

    4. **NIO2 Connector**:Tomcat 9.0引入了NIO2连接器,提供非阻塞I/O,适用于高并发场景,可以提高服务器性能。 5. **JAR Scanning**:为提高安全性,Tomcat现在可以扫描JAR文件中的特定元素,如TLDs,以便自动注册...

    tomcat7源代码

    《深入剖析Tomcat7源代码》 Tomcat7是一款广泛使用的开源Java Servlet容器,它实现了Java EE中的Web应用规范,包括Servlet、JSP和EL(Expression Language)等。本资源包含Tomcat7的源代码以及运行所需的jar包,...

    httpcore-nio-4.4.10-API文档-中文版.zip

    赠送源代码:httpcore-nio-4.4.10-sources.jar; 赠送Maven依赖信息文件:httpcore-nio-4.4.10.pom; 包含翻译后的API文档:httpcore-nio-4.4.10-javadoc-API文档-中文(简体)版.zip; Maven坐标:org.apache....

    网络编程(socket、NIO、mina)---demo

    在这个"网络编程(socket、NIO、mina)---demo"的主题中,我们将深入探讨三个关键概念:Socket编程、非阻塞I/O(Non-blocking I/O,简称NIO)以及Apache Mina框架。这些技术广泛应用于构建高性能、高并发的网络应用...

    tomcat-native-1.2.25-src-build

    "tomcat-native-1.2.25-src-build" 指的是该扩展的源代码版本1.2.25,用于编译生成适用于不同操作系统的本地库文件。 【编译过程】:下载的源码包"tomcat-native-1.2.25-src-build"需要经过编译才能生成适用于目标...

    apache-tomcat-9.0.74-windows-x64

    Apache Tomcat 9.0.74 是一个广泛使用的开源软件,它是一个实现了Java Servlet、JavaServer Pages(JSP)和Java EE的Web应用程序容器。这个版本是专门为Windows x64平台设计的,确保在64位操作系统上高效运行。在...

Global site tag (gtag.js) - Google Analytics