`
Tyrion
  • 浏览: 260916 次
  • 性别: Icon_minigender_1
  • 来自: 南京
社区版块
存档分类
最新评论

Tomcat7中NIO处理分析(二)

阅读更多
  • 6.PollerEvent处理流程

Poller处理的核心是启动执行事件队列中的PollerEvent,接着从selector中遍历已经就绪的key,一旦发生了感兴趣的事件,则交由processSocket方法处理。PollerEvent的作用是向socket注册或更新感兴趣的事件:

    /**
     *
     * PollerEvent, cacheable object for poller events to avoid GC
     */
    public static class PollerEvent implements Runnable {

	// 每个PollerEvent都会保存NioChannel的引用
        protected NioChannel socket;
        protected int interestOps;
        protected KeyAttachment key;
        public PollerEvent(NioChannel ch, KeyAttachment k, int intOps) {
            reset(ch, k, intOps);
        }

        public void reset(NioChannel ch, KeyAttachment k, int intOps) {
            socket = ch;
            interestOps = intOps;
            key = k;
        }

        public void reset() {
            reset(null, null, 0);
        }

        @Override
        public void run() {
            //socket第一次注册到selector中,完成对socket读事件的注册
            if ( interestOps == OP_REGISTER ) {
                try {
                    socket.getIOChannel().register(socket.getPoller().getSelector(), SelectionKey.OP_READ, key);
                } catch (Exception x) {
                    log.error("", x);
                }
            } else {
                // socket之前已经注册到了selector中,更新socket所感兴趣的事件
                final SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
                try {
                    boolean cancel = false;
                    if (key != null) {
                        final KeyAttachment att = (KeyAttachment) key.attachment();
                        if ( att!=null ) {
                            //handle callback flag
                            if (att.isComet() && (interestOps & OP_CALLBACK) == OP_CALLBACK ) {
                                att.setCometNotify(true);
                            } else {
                                att.setCometNotify(false);
                            }
                            interestOps = (interestOps & (~OP_CALLBACK));//remove the callback flag
                            // 刷新事件的最后访问时间,防止事件超时 
                            att.access();//to prevent timeout
                            //we are registering the key to start with, reset the fairness counter.
                            int ops = key.interestOps() | interestOps;
                            att.interestOps(ops);
                            key.interestOps(ops);
                        } else {
                            cancel = true;
                        }
                    } else {
                        cancel = true;
                    }
                    if ( cancel ) socket.getPoller().cancelledKey(key,SocketStatus.ERROR,false);
                }catch (CancelledKeyException ckx) {
                    try {
                        socket.getPoller().cancelledKey(key,SocketStatus.DISCONNECT,true);
                    }catch (Exception ignore) {}
                }
            }//end if
        }//run

        @Override
        public String toString() {
            return super.toString()+"[intOps="+this.interestOps+"]";
        }
    }

 

 

  • 7.将socket交给Worker执行

在第5步的Poller处理流程的分析中看到它的run方法最后会调用processKey()处理selector检测到的通道事件,而在这个方法最后会调用processSocket来调用具体的通道处理逻辑,看下processSocket方法的实现:

    public boolean processSocket(NioChannel socket, SocketStatus status, boolean dispatch) {
        try {
            KeyAttachment attachment = (KeyAttachment)socket.getAttachment();
            if (attachment == null) {
                return false;
            }
            attachment.setCometNotify(false); //will get reset upon next reg
            // 从SocketProcessor的缓存队列中取出一个来处理socket
            SocketProcessor sc = processorCache.poll();
            if ( sc == null ) sc = new SocketProcessor(socket,status);
            else sc.reset(socket,status);
            // 将有事件发生的socket交给Worker处理 
            if ( dispatch && getExecutor()!=null ) getExecutor().execute(sc);
            else sc.run();
        } catch (RejectedExecutionException rx) {
            log.warn("Socket processing request was rejected for:"+socket,rx);
            return false;
        } catch (Throwable t) {
            ExceptionUtils.handleThrowable(t);
            // This means we got an OOM or similar creating a thread, or that
            // the pool and its queue are full
            log.error(sm.getString("endpoint.process.fail"), t);
            return false;
        }
        return true;
    }

Poller通过NioEndpoint的协调,将发生事件的socket交给工作者线程Worker来进一步处理。整个事件框架的工作就到此结束,下面就是Worker的处理。

 

 

  • 8.从socket中处理请求

在Tomcat6版本的NIO处理实现中有一个Worker类,在Tomcat7中把它去掉了,但工作者的职责还在,只是交由了上面看到的SocketProcessor这个类来担当,看下这个类的实现代码:

    // ---------------------------------------------- SocketProcessor Inner Class
    // 这个类相当于一个工作者,但只会在一个外部线程池中简单使用。
    /**
     * This class is the equivalent of the Worker, but will simply use in an
     * external Executor thread pool.
     */
    protected class SocketProcessor implements Runnable {

        // 每个SocketProcessor保存一个NioChannel的引用
        protected NioChannel socket = null;
        protected SocketStatus status = null;

        public SocketProcessor(NioChannel socket, SocketStatus status) {
            reset(socket,status);
        }

        public void reset(NioChannel socket, SocketStatus status) {
            this.socket = socket;
            this.status = status;
        }

        @Override
        public void run() {
            // 从socket中获取SelectionKey
            SelectionKey key = socket.getIOChannel().keyFor(
                    socket.getPoller().getSelector());
            KeyAttachment ka = null;

            if (key != null) {
                ka = (KeyAttachment)key.attachment();
            }

            // Upgraded connections need to allow multiple threads to access the
            // connection at the same time to enable blocking IO to be used when
            // NIO has been configured
            if (ka != null && ka.isUpgraded() &&
                    SocketStatus.OPEN_WRITE == status) {
                synchronized (ka.getWriteThreadLock()) {
                    doRun(key, ka);
                }
            } else {
                synchronized (socket) {
                    doRun(key, ka);
                }
            }
        }

        private void doRun(SelectionKey key, KeyAttachment ka) {
            try {
                int handshake = -1;

                try {
                    if (key != null) {
                        // For STOP there is no point trying to handshake as the
                        // Poller has been stopped.
                        if (socket.isHandshakeComplete() ||
                                status == SocketStatus.STOP) {
                            handshake = 0;
                        } else {
                            handshake = socket.handshake(
                                    key.isReadable(), key.isWritable());
                            // The handshake process reads/writes from/to the
                            // socket. status may therefore be OPEN_WRITE once
                            // the handshake completes. However, the handshake
                            // happens when the socket is opened so the status
                            // must always be OPEN_READ after it completes. It
                            // is OK to always set this as it is only used if
                            // the handshake completes.
                            status = SocketStatus.OPEN_READ;
                        }
                    }
                }catch ( IOException x ) {
                    handshake = -1;
                    if ( log.isDebugEnabled() ) log.debug("Error during SSL handshake",x);
                }catch ( CancelledKeyException ckx ) {
                    handshake = -1;
                }
                if ( handshake == 0 ) {
                    SocketState state = SocketState.OPEN;
                    // Process the request from this socket
                    if (status == null) {
                        // 最关键的代码,这里将KeyAttachment(实际就是socket)交给Handler处理请求
                        state = handler.process(ka, SocketStatus.OPEN_READ);
                    } else {
                        state = handler.process(ka, status);
                    }
                    if (state == SocketState.CLOSED) {
                        // Close socket and pool
                        try {
                            close(ka, socket, key, SocketStatus.ERROR);
                        } catch ( Exception x ) {
                            log.error("",x);
                        }
                    }
                } else if (handshake == -1 ) {
                    close(ka, socket, key, SocketStatus.DISCONNECT);
                } else {
                    ka.getPoller().add(socket, handshake);
                }
            } catch (CancelledKeyException cx) {
                socket.getPoller().cancelledKey(key, null, false);
            } catch (OutOfMemoryError oom) {
                try {
                    oomParachuteData = null;
                    log.error("", oom);
                    if (socket != null) {
                        socket.getPoller().cancelledKey(key,SocketStatus.ERROR, false);
                    }
                    releaseCaches();
                }catch ( Throwable oomt ) {
                    try {
                        System.err.println(oomParachuteMsg);
                        oomt.printStackTrace();
                    }catch (Throwable letsHopeWeDontGetHere){
                        ExceptionUtils.handleThrowable(letsHopeWeDontGetHere);
                    }
                }
            } catch (VirtualMachineError vme) {
                ExceptionUtils.handleThrowable(vme);
            }catch ( Throwable t ) {
                log.error("",t);
                if (socket != null) {
                    socket.getPoller().cancelledKey(key,SocketStatus.ERROR,false);
                }
            } finally {
                socket = null;
                status = null;
                //return to cache
                if (running && !paused) {
                    processorCache.offer(this);
                }
            }
        }

        private void close(KeyAttachment ka, NioChannel socket, SelectionKey key,
                SocketStatus socketStatus) {
		...
        }
    }

可以看到由SocketProcessor寻找合适的Handler处理器做最终socket转换处理。

 

可以用下面这幅图总结一下NioEndpoint的主要流程:


Acceptor和Poller是线程数组,Worker是一个线程池(Executor)

 

 

1
1
分享到:
评论

相关推荐

    Tomcat7 性能优化

    适用于Tomcat7或Tomcat8在Windows系统中的默认设置,以及Linux系统中已安装APR的情况。 - **安装指南**:参考文档[https://my.oschina.net/lsw90/blog/181161](https://my.oschina.net/lsw90/blog/181161)进行具体...

    tomcat 7 及其源码

    Apache Tomcat 7 是一个广泛使用的开源Java Servlet容器,它实现了Java EE的Web应用程序部分,特别是Servlet和JSP规范。这个版本7.0.42是Tomcat 7的一个稳定版本,提供了对Java Servlet 3.0和JavaServer Pages (JSP)...

    Tomcat7优化.docx

    优化Tomcat7的目标是为了提高网站的并发处理能力,确保在高流量环境下仍然能保持良好的响应速度和服务稳定性。这通常涉及到调整服务器资源、配置设置、JVM参数等多个方面。 **服务器资源** 服务器的硬件资源如CPU、...

    tomcat7的源码包 tomcat7的压缩包

    7. **错误处理与日志记录**:Tomcat有内置的错误页面处理和日志系统,可以通过配置文件控制错误报告和日志级别,方便问题排查和性能分析。 8. **热部署**:Tomcat支持热部署,即在不重启服务器的情况下更新应用。当...

    tomcat7优化

    【Tomcat7优化】是为了提升Web应用程序的并发处理能力和整体性能。这涉及到多个层面的调整,包括服务器资源的充分利用、Tomcat配置的优化以及运行模式的选择。以下将详细阐述这些关键点。 首先,**服务器资源**是...

    Tomcat7 32位

    - **性能优化**:在Tomcat7中,对连接器(Coyote)进行了优化,提高了并发性能,支持NIO(非阻塞I/O)和APR(Apache Portable Runtime)库,以提高系统效率。 - **管理工具**:提供了一个基于Web的管理界面,可以...

    tomcat 7 64位操作系统

    在这个压缩包文件`apache-tomcat-7.0.21`中,我们将探讨Tomcat 7的关键特性、配置、安装及管理等知识点。 1. **Tomcat 7特性**: - 支持Java EE 6 Web Profile:Tomcat 7实现了Java Platform, Enterprise Edition ...

    apache-tomcat-7 源代码

    Apache Tomcat 7 是一个广泛使用的开源软件,用于部署和运行Java Servlets和JavaServer Pages (JSP)。它是Apache软件基金会的一个项目,是实现Java EE Web容器规范的重要组成部分,特别是Servlet 3.0和JSP 2.2规范。...

    tomcat7.070 源码及转成eclipse

    7. **线程模型**:Tomcat采用多线程模型处理请求,`Executor`接口及其实现允许自定义线程池,以优化并发性能。 8. **连接器与协议处理器**:Tomcat支持多种连接器,如基于NIO的` CoyoteNioProtocol`,基于APR的`...

    tomcat7 源码

    Tomcat7是一款广泛应用的开源Java Servlet容器,它实现了Java EE中的Web应用规范,包括Servlet、JSP和EL(Expression Language)等。源码分析是理解其工作原理和提升开发技能的重要途径。Apache Tomcat 7.0.33的源码...

    Tomcat_系统架构与模式设计分析.doc

    《Tomcat系统架构与模式设计分析》 Tomcat,作为一款广泛应用的Java Servlet容器,其系统架构和模式设计对于理解其高效稳定运行至关重要。本文将深入探讨Tomcat的核心组件、结构以及关键设计模式。 首先,Tomcat的...

    深入剖析TOMCAT+Tomcat权威指南(第二版)

    《深入剖析TOMCAT+Tomcat权威指南(第二版)》是两本关于Apache Tomcat服务器的重量级著作,它们详尽地阐述了Tomcat的内部工作机制、配置、优化以及故障排查等方面的知识,旨在帮助读者从新手到专家,全面掌握这款广泛...

    Tomcat权威指南(第二版)(中英文版)

    7. **集群与负载均衡**:介绍如何设置Tomcat集群,实现应用的高可用性和负载均衡。 8. **性能优化**:分析影响Tomcat性能的因素,提供调整JVM参数、缓存策略、线程池设置等优化技巧。 9. **故障排查**:分享诊断和...

    tomcat 系列 tomcat 系列

    通过以上分析,我们可以看出,每个Tomcat版本都有其特定的应用场景和优势。选择合适的版本,不仅关乎应用的性能,也直接影响到开发和维护的效率。理解各个版本的特点,有助于我们在实际工作中做出最佳决策。在升级或...

    Servlet API 和 NIO: 最终组合在一起

    Servlet API和NIO(New IO)是Java编程中两个重要的概念,它们在处理网络I/O操作上有着不同的优势。Servlet API是Java服务器端编程的重要组成部分,主要用于构建动态Web应用程序,而NIO则是一种非阻塞的I/O模型,...

    基于tomcat的连接数与线程池详解

    Tomcat 7和Tomcat 8的默认protocol处理逻辑不同,根据是否有APR本地库的支持来选择BIO或APR,而在Tomcat 8.5和Tomcat 9.0,不再支持BIO。 连接数和线程池的配置直接关联到Tomcat的性能和资源消耗。连接数决定了...

    apache-tomcat-7.0.81-src 源码免费下载

    9. **错误处理与日志系统**:Tomcat使用自定义的日志框架,源码中`logging`目录下的类定义了如何记录和处理错误信息。 10. **网络编程**:Tomcat底层使用NIO(非阻塞I/O)和BIO(阻塞I/O)模型,这在`java/org/...

    tomcat7的源代码

    通过分析源码,我们可以学习如何处理网络I/O,以及如何与Java的NIO(非阻塞I/O)集成。 5. **部署和生命周期管理**:Tomcat使用ContextDeployer来检测和部署Web应用。源码揭示了应用的加载、卸载、启动和停止的生命...

    tomcat性能优化.pdf

    例如,Tomcat中的sendfile实现在NIO通道中是支持的,而在BIO通道中则不支持。这些细节对性能有直接影响。 优化Tomcat中的compression压缩属性,可以减少数据传输量,提高响应速度,但需要与sendfile功能的互斥性...

    Tomcat7.0(源码+jar包)

    2. **NIO连接器**:除了传统的BIO连接器,Tomcat7.0还支持非阻塞I/O(NIO)模式,这使得在高并发环境下表现更优。 3. **改进的部署功能**:允许用户通过简单的XML文件或web应用的目录结构来部署应用,也支持热部署...

Global site tag (gtag.js) - Google Analytics