`
liudeh_009
  • 浏览: 243525 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

Jetty基于NIO的方式处理请求

阅读更多

       Jetty基于NIO的方式处理请求的类是SelectChannelConnector,该类同样继承AbstractLifeCycle类,SelectChannelConnector初始化的时候会调用AbstractLifeCycle类的start()方法,如下:

       

 public final void start() throws Exception
    {
        synchronized (_lock)
        {
            try
            {
                if (_state == STARTED || _state == STARTING)
                    return;
                setStarting();
                doStart();
                Log.debug("started {}",this);
                setStarted();
            }
            catch (Exception e)
            {
                setFailed(e);
                throw e;
            }
            catch (Error e)
            {
                setFailed(e);
                throw e;
            }
        }
    }

   doStart()方法在SelectChannelConnector类中.如下:

  

   protected void doStart() throws Exception
    {
        _manager.setSelectSets(getAcceptors());//设置接收请求的线程个数,默认1个
        _manager.setMaxIdleTime(getMaxIdleTime());
        _manager.setLowResourcesConnections(getLowResourcesConnections());
        _manager.setLowResourcesMaxIdleTime(getLowResourcesMaxIdleTime());
        _manager.start();//初始化Selector
         open();//初始化ServerSocketChannel
        _manager.register(_acceptChannel);
        super.doStart();
    }

   _manager类名为SelectorManager,open()方法如下:

  

    public void open() throws IOException
    {
        synchronized(this)
        {
            if (_acceptChannel == null)
            {
                // Create a new server socket
                _acceptChannel = ServerSocketChannel.open();

                // Bind the server socket to the local host and port
                _acceptChannel.socket().setReuseAddress(getReuseAddress());
                InetSocketAddress addr = getHost()==null?new InetSocketAddress(getPort()):new InetSocketAddress(getHost(),getPort());
                _acceptChannel.socket().bind(addr,getAcceptQueueSize());

                // Set to non blocking mode
                _acceptChannel.configureBlocking(false);
                
            }
        }
    }

     super.doStart()方法如下:

   

 protected void doStart() throws Exception
    {
        if (_server==null)
            throw new IllegalStateException("No server");
        
        // open listener port
        open();//再一次调用open()方法,确保ServerSocketChannel启动,调用两次就能确保启动?
        super.doStart();
        
        if (_threadPool==null)
            _threadPool=_server.getThreadPool();
        if (_threadPool!=_server.getThreadPool() && (_threadPool instanceof LifeCycle))
            ((LifeCycle)_threadPool).start();
        
        // Start selector thread
        synchronized(this)
        {
            _acceptorThread=new Thread[getAcceptors()];

            for (int i=0;i<_acceptorThread.length;i++)
            {
                if (!_threadPool.dispatch(new Acceptor(i)))//启动接受请求的线程
                {
                    Log.warn("insufficient maxThreads configured for {}",this);
                    break;
                }
            }
        }
        
        Log.info("Started {}",this);
    }

     Acceptor线程的run()方法如下:

  

 public void run()
        {   
            Thread current = Thread.currentThread();
            String name;
            synchronized(AbstractConnector.this)//设置当前线程的名字,是不是太复杂点
            {
                if (_acceptorThread==null)
                    return;
                
                _acceptorThread[_acceptor]=current;
                name =_acceptorThread[_acceptor].getName();
                current.setName(name+" - Acceptor"+_acceptor+" "+AbstractConnector.this);
            }
            int old_priority=current.getPriority();
            
            try
            {
                current.setPriority(old_priority-_acceptorPriorityOffset);
                while (isRunning() && getConnection()!=null)//connector初始化并且ServerSocketChannel存在
                {
                    try
                    {
                        accept(_acceptor);//处理收到的请求 
                    }
                    catch(EofException e)
                    {
                        Log.ignore(e);
                    }
                    catch(IOException e)
                    {
                        Log.ignore(e);
                    }
                    catch(ThreadDeath e)
                    {
                        throw e;
                    }
                    catch(Throwable e)
                    {
                        Log.warn(e);
                    }
                }
            }
            finally
            {   
                current.setPriority(old_priority);
                current.setName(name);
                
                synchronized(AbstractConnector.this)
                {
                    if (_acceptorThread!=null)
                        _acceptorThread[_acceptor]=null;
                }
            }
        }

       accept(_acceptor)最终会调用SelectorManager.SelectSet.doSelect()方法,该方法比较复杂,简单来说就是每接受一个请求就注册到Selector上,并且用SelectChannelEndPoint类(本身也是一个线程)处理请求,SelectChannelEndPoint类的run()方法如下:

    

   public void run()
    {
        try
        {
            _connection.handle();
        }
        catch (ClosedChannelException e)
        {
            Log.ignore(e);
        }
        catch (EofException e)
        {
            Log.debug("EOF", e);
            try{close();}
            catch(IOException e2){Log.ignore(e2);}
        }
        catch (HttpException e)
        {
            Log.debug("BAD", e);
            try{close();}
            catch(IOException e2){Log.ignore(e2);}
        }
        catch (Throwable e)
        {
            Log.warn("handle failed", e);
            try{close();}
            catch(IOException e2){Log.ignore(e2);}
        }
        finally
        {
            undispatch();
        }
    }

      _connection类的类名为HttpConnection,HttpConnection的handle()方法如下:

 

      

   public void handle() throws IOException
    {
        // Loop while more in buffer
        boolean more_in_buffer = true; // assume true until proven otherwise
        int no_progress = 0;

        while (more_in_buffer)
        {
            try
            {
                synchronized (this)
                {
                    if (_handling)
                        throw new IllegalStateException(); // TODO delete this
                                                           // check
                    _handling = true;
                }

                setCurrentConnection(this);
                long io = 0;

                Continuation continuation = _request.getContinuation();//得到RetryContinuation
                if (continuation != null && continuation.isPending())
                {
                    Log.debug("resume continuation {}",continuation);
                    if (_request.getMethod() == null)
                        throw new IllegalStateException();
                    handleRequest();//处理http请求,执行filter,servlet等
                }
                else//解析http请求
                {
                    // If we are not ended then parse available
                    if (!_parser.isComplete())
                        io = _parser.parseAvailable();

                    // Do we have more generating to do?
                    // Loop here because some writes may take multiple steps and
                    // we need to flush them all before potentially blocking in
                    // the
                    // next loop.
                    while (_generator.isCommitted() && !_generator.isComplete())
                    {
                        long written = _generator.flush();
                        io += written;
                        if (written <= 0)
                            break;
                        if (_endp.isBufferingOutput())
                            _endp.flush();
                    }

                    // Flush buffers
                    if (_endp.isBufferingOutput())
                    {
                        _endp.flush();
                        if (!_endp.isBufferingOutput())
                            no_progress = 0;
                    }

                    if (io > 0)
                        no_progress = 0;
                    else if (no_progress++ >= 2)
                        return;
                }
            }
            catch (HttpException e)
            {
                if (Log.isDebugEnabled())
                {
                    Log.debug("uri=" + _uri);
                    Log.debug("fields=" + _requestFields);
                    Log.debug(e);
                }
                _generator.sendError(e.getStatus(),e.getReason(),null,true);

                _parser.reset(true);
                _endp.close();
                throw e;
            }
            finally
            {
                setCurrentConnection(null);

                more_in_buffer = _parser.isMoreInBuffer() || _endp.isBufferingInput();

                synchronized (this)
                {
                    _handling = false;

                    if (_destroy)
                    {
                        destroy();
                        return;
                    }
                }

                if (_parser.isComplete() && _generator.isComplete() && !_endp.isBufferingOutput())
                {
                    if (!_generator.isPersistent())
                    {
                        _parser.reset(true);
                        more_in_buffer = false;
                    }

                    if (more_in_buffer)
                    {
                        reset(false);
                        more_in_buffer = _parser.isMoreInBuffer() || _endp.isBufferingInput();
                    }
                    else
                        reset(true);

                    no_progress = 0;
                }

                Continuation continuation = _request.getContinuation();
                if (continuation != null && continuation.isPending())
                {
                    break;
                }else if (_generator.isCommitted() && !_generator.isComplete() && _endp instanceof SelectChannelEndPoint) // TODO
                        ((SelectChannelEndPoint)_endp).setWritable(false);
            }
        }
    }
1
1
分享到:
评论

相关推荐

    从Jetty、Tomcat和Mina中提炼NIO构架网络服务器的经典模式

    1. **分离职责**:将连接监听、请求监听和请求处理分开,通过不同的线程或组件处理,提高效率。 2. **异步处理**:利用NIO的非阻塞特性,确保Selector能持续监听新的连接请求。 3. **多线程并发**:根据系统资源动态...

    jetty 8及依赖包

    学习Jetty 8,你可以深入理解Web服务器的内部机制,包括线程模型、请求处理流程、以及如何利用Servlet和WebSocket构建现代Web应用。通过分析源代码和实验,你可以了解到如何自定义配置Jetty以适应特定的应用需求,...

    jetty6.1.6-2

    7. **配置灵活性**:Jetty提供了多种方式来配置服务器,包括XML配置文件、程序化API以及基于注解的配置。 8. **模块化**:Jetty的lib目录可能包含多个模块的JAR,如jetty-util.jar、jetty-http.jar等,这种模块化...

    i-jetty源码

    1. **事件驱动**:i-jetty采用非阻塞I/O(NIO)模型,基于Java的Selector API,以事件驱动的方式处理网络通信,提高系统资源利用率。 2. **模块化**:i-jetty的架构高度模块化,每个功能组件都可以独立加载和卸载,...

    jetty嵌入式服务器实例大全

    Jetty支持Servlet 3.0引入的异步处理,允许Servlet在非阻塞模式下处理请求,提高服务器性能。 10. **嵌入式使用**: 最大的特点是Jetty的嵌入式特性,可以直接在你的应用程序中启动和停止Jetty服务器,无需独立...

    jetty指导书

    Jetty支持异步Servlet,通过Continuations机制可以实现非阻塞的处理方式。 **14.2 AJAX** AJAX技术在现代Web应用中被广泛应用,Jetty通过支持异步Servlet等功能,为实现AJAX提供了基础。 **14.3 Comet** Comet是...

    Jetty权威指南.pdf

    Jetty还可以与其他Web服务器(如Apache)配合使用,通过mod_proxy或AJP代理协议将请求转发给Jetty处理。 #### 九、虚拟主机 **9.1 虚拟主机的配置方法** Jetty支持在同一台物理服务器上托管多个虚拟主机。通过...

    基于Spring Boot + NIO实现的电商平台见证宝服务

    1. **高效网络通信**:通过NIO的非阻塞特性,服务器可以同时处理大量客户端请求,而无需为每个请求创建新的线程。 2. **数据流优化**:缓冲区的使用减少了数据在内存和网络之间的拷贝次数,提高了数据传输效率。 3. ...

    jetty 架构

    与许多其他Web服务器不同,Jetty没有采用多层架构,而是采用了基于事件驱动的单线程模型,这使得它在处理高并发请求时表现出色。每个连接都由一个单独的线程负责,避免了线程上下文切换带来的开销,提高了系统性能。...

    jetty-util-6.1.5.jar

    4. **异步I/O**:Jetty使用NIO(非阻塞I/O)模型,提高了服务器处理大量并发连接的能力。Jetty Util包含的类支持这种高效的I/O模型。 5. **HTTP协议处理**:库中包含了处理HTTP头、编码和解码请求与响应的工具类,...

    jetty6.1.6-1

    1. **Servlet支持**:Jetty完全兼容Servlet 2.5规范,能够处理HTTP请求并托管基于Servlet的Web应用程序。Servlet是一种Java API,用于开发动态Web内容。 2. **轻量级和快速**:Jetty设计得非常小巧,对内存和CPU...

    jetty入门

    1. **环境准备**:确保已经安装了Java开发环境,因为Jetty是基于Java的。你可以从Jetty官方网站下载适合版本的Jetty发行版。 2. **理解基本结构**:了解Jetty的基本目录结构,如`webapps`目录用于放置Web应用,`lib...

    jetty内嵌到java代码启动

    Jetty是一款轻量级、高性能的Java Web服务器和Servlet...通过理解和掌握这些关键知识点,你可以轻松地构建和调试基于Jetty的应用。在实际项目中,根据需求选择适当的配置和扩展,可以实现更加灵活、高效的Web服务部署。

    jetty源码查阅

    - 分析请求处理过程,理解`Handler`和`Connector`的角色。 - 针对特定需求,比如性能优化或新功能添加,深入研究相关源码。 通过阅读Jetty源码,你可以了解到Web服务器如何处理网络请求、管理线程、优化内存使用...

    jetty相关所有jar包

    1. **Jetty Server**: 这是Jetty的核心组件,负责处理HTTP请求和响应。它提供了一个可扩展的框架,允许开发者根据需要添加各种模块和服务。 2. **Servlet容器**: Jetty作为Servlet 3.1规范的实现者,可以托管...

    jetty嵌入式Httpserver

    通过阅读源码,我们可以深入理解其内部工作流程,比如请求处理链路、线程池管理、会话管理等高级特性。对于开发者来说,这是一个提升技能的好途径。 在开发过程中,我们还可以利用Jetty的工具类和API进行调试和性能...

    jetty start 9.2.13 项目所需要的完整jar包,免费。jetty启动调试

    7. **jetty-http-9.2.13.v20150730.jar**:实现了HTTP/1.1协议,处理请求和响应,是Jetty与客户端交互的基础。 8. **jetty-security-9.2.13.v20150730.jar**:提供了安全相关的功能,如基本认证、表单认证以及SSL/...

    jetty实施手册

    Jetty使用NIO实现Web请求的异步处理,显著特点是: - **少量线程处理大量并发请求**:传统BIO模型下,每个Socket连接都需要独立的线程进行阻塞式通信,而NIO模型仅需少量线程管理多个连接,大大提升了服务器的处理...

    jetty9.4.6 2017最新 免费下载

    - **jetty-server-9.4.6.v20170531.jar**:这是Jetty的核心服务器组件,负责处理HTTP请求和响应。 - **jetty-http-9.4.6.v20170531.jar**:包含了HTTP协议的实现,处理HTTP协议相关的逻辑。 - **jetty-io-9.4.6.v...

    jetty嵌入项目实战

    - 使用`org.eclipse.jetty.server.Server`类创建服务器实例,并配置监听端口、处理请求的Handler等。 - 配置Servlet容器,例如使用`org.eclipse.jetty.webapp.WebAppContext`加载Web应用上下文。 3. **实战项目...

Global site tag (gtag.js) - Google Analytics