`

Jetty如何实现NIO分析(三)

阅读更多

需要了解知识:

    1.IO模型:参考  IO与操作系统关系(一)   JAVA几种IO工作机制及特点(二) 

    2.jetty容器: 参考 JETTY基本架构

 

1.jetty 模块分析

详细参考官网:http://wiki.eclipse.org/Jetty/Reference/Dependencies  (jetty模块依赖)

1.1 jetty依赖树:

Dependencies.jpg

 

This diagram shows the compile dependencies for the Jetty project. The external dependencies are listed on the right hand side and all other modules shown are part of the project. 

 

1.2 jetty核心模块(http客户端 服务端通讯模块)

(官方解释)The jetty-util, jetty-io and jetty-http jars form the core of the jetty HTTP handler (generation and parsing) that is used for both the jetty-client and the jetty-server。

如下图:

我们平常最关心的就是 client 如何 server进行通讯,如何实现io通讯的,所以以下几个模块需要了解:

jetty-client:

jetty-server:

jetty-http:

jetty-io:

jetty-util:

 

1.3 jetty模块结构分析

注:下面讲解jetty核心代码类是用到jetty8中的selectChannelConnector,这里大家参考jetty8 API,jetty9已更换实现类为ServerConnector: 

jetty8 api:http://download.eclipse.org/jetty/8.1.17.v20150415/apidocs/ 

 

     1.3.1 添加依赖jetty插件,从search.maven.org 下载:

     

<!-- jetty -->
<dependency>
      <groupId>org.eclipse.jetty.aggregate</groupId>
      <artifactId>jetty-all-server</artifactId>
      <version>8.1.18.v20150929</version>
</dependency>
   依赖组件如下:

 

  

 

     1.3.2 Jetty Connector的实现类图:

 

      

    其中:

     SelectChannelConnector 负责组装各组件

     SelectSet 负责侦听客户端请求

     SelectChannelEndPoint 负责IO的读和写

     HttpConnection 负责逻辑处理

    在整个服务端处理请求的过程可以分为三个阶段,时序图如下所示:

    阶段一:监听并建立连接

这一过程主要是启动一个线程负责accept新连接,监听到后分配给相应的SelectSet,分配的策略就是轮询。

SelectChannelConnector核心方法:

 1.创建SelectorManager 用于接收客户端请求 dispatch()

    
   private final SelectorManager _manager = new ConnectorSelectorManager();
    /*
     * @see org.eclipse.jetty.server.server.AbstractConnector#doStart()
     */
    @Override
    protected void doStart() throws Exception
    {
        _manager.setSelectSets(getAcceptors());
        _manager.setMaxIdleTime(getMaxIdleTime());
        _manager.setLowResourcesConnections(getLowResourcesConnections());
        _manager.setLowResourcesMaxIdleTime(getLowResourcesMaxIdleTime());

        super.doStart();// 1.1会调用open方法启动server监听端口1.2 同时创建线程池接收连接
    }

/* --------------------------1.1 启动监听端口port---------------------------------- */
    public void open() throws IOException
    {
        synchronized(this)
        {
            if (_acceptChannel == null)
            {
                // Create a new server socket
                _acceptChannel = ServerSocketChannel.open();
                // Set to blocking mode
                _acceptChannel.configureBlocking(true);

                // Bind the server socket to the local host and port
                _acceptChannel.socket().setReuseAddress(getReuseAddress());
                InetSocketAddress addr = getHost()==null?new InetSocketAddress(getPort()):new InetSocketAddress(getHost(),getPort());
                _acceptChannel.socket().bind(addr,getAcceptQueueSize());

                _localPort=_acceptChannel.socket().getLocalPort();
                if (_localPort<=0)
                    throw new IOException("Server channel not bound");

                addBean(_acceptChannel);
            }
        }
    }

/* ----------------------1.2 启动一个线程池监听 客户端连接,默认是1个---------------------- */
    @Override
    public void accept(int acceptorID) throws IOException
    {
        ServerSocketChannel server;
        synchronized(this)
        {
            server = _acceptChannel;
        }

        if (server!=null && server.isOpen() && _manager.isStarted())
        {
            SocketChannel channel = server.accept();
            channel.configureBlocking(false);
            Socket socket = channel.socket();
            configure(socket);
            _manager.register(channel); //SelectorManager管理所有channel
        }
    }
    /** Register a channel 注册channel到selectSet
     * @param channel
     */
    public void register(SocketChannel channel)
    {
        // The ++ increment here is not atomic, but it does not matter.
        // so long as the value changes sometimes, then connections will
        // be distributed over the available sets.

        int s=_set++;
        if (s<0)
            s=-s;
        s=s%_selectSets;
        SelectSet[] sets=_selectSet;
        if (sets!=null)
        {
            SelectSet set=sets[s];
            set.addChange(channel);
            set.wakeup();
        }
    }

 

 

阶段二:监听客户端的请求

 

这一过程主要是启动多个线程(线程数一般为服务器CPU的个数,dubbo的nio配置 cpu核数+1),让SelectSet监听所管辖的channel队列,每个SelectSet维护一个Selector,这个Selector监听队列里所有的channel,一旦有读事件,从线程池里拿线程去做处理请求.

selectorManager交由线程去处理 dispatch()

/* ------------------------------------------------------------ */
    /* (non-Javadoc)
     * @see org.eclipse.component.AbstractLifeCycle#doStart()
     */
    @Override
    protected void doStart() throws Exception
    {
        _selectSet = new SelectSet[_selectSets];
        for (int i=0;i<_selectSet.length;i++)
            _selectSet[i]= new SelectSet(i);

        super.doStart();

        // start a thread to Select
        for (int i=0;i<getSelectSets();i++)
        {
            final int id=i;
            boolean selecting=dispatch(new Runnable()
            {
                public void run()
                {
                    String name=Thread.currentThread().getName();
                    int priority=Thread.currentThread().getPriority();
                    try
                    {
                        SelectSet[] sets=_selectSet;
                        if (sets==null)
                            return;
                        SelectSet set=sets[id];

                        Thread.currentThread().setName(name+" Selector"+id);//按selector创建
                        if (getSelectorPriorityDelta()!=0)
                            Thread.currentThread().setPriority(Thread.currentThread().getPriority()+getSelectorPriorityDelta());
                        LOG.debug("Starting {} on {}",Thread.currentThread(),this);
                        while (isRunning())
                        {
                            try
                            {
                                set.doSelect();//Select and dispatch tasks
                            }
                            catch(IOException e)
                            {
                                LOG.ignore(e);
                            }
                            catch(Exception e)
                            {
                                LOG.warn(e);
                            }
                        }
                    }
                    finally
                    {
                        LOG.debug("Stopped {} on {}",Thread.currentThread(),this);
                        Thread.currentThread().setName(name);
                        if (getSelectorPriorityDelta()!=0)
                            Thread.currentThread().setPriority(priority);
                    }
                }

            });

            if (!selecting)
                throw new IllegalStateException("!Selecting");
        }
    }

/* ------------------------SelectChannelEndPoint 分发处理----------------------------- */
    public void dispatch()
    {
        synchronized(this)
        {
            if (_state<=STATE_UNDISPATCHED)
            {
                if (_onIdle)
                    _state = STATE_NEEDS_DISPATCH;
                else
                {
                    _state = STATE_DISPATCHED;
                    boolean dispatched = _manager.dispatch(_handler);
                    if(!dispatched)
                    {
                        _state = STATE_NEEDS_DISPATCH;
                        LOG.warn("Dispatched Failed! "+this+" to "+_manager);
                        updateKey();
                    }
                }
            }
        }
    }

 

阶段三:处理请求

 

这一过程就是每次客户端请求的数据处理过程,值得注意的是为了不让后端的业务处理阻碍Selector监听新的请求,就多线程来分隔开监听请求和处理请求两个阶段。

 

/* ------------------------------------------------------------ */
    /*
     */
    protected void handle()
    {
        boolean dispatched=true;
        try
        {
            while(dispatched)
            {
                try
                {
                    while(true)
                    {
                        final AsyncConnection next = (AsyncConnection)_connection.handle();
                        if (next!=_connection)
                        {
                            LOG.debug("{} replaced {}",next,_connection);
                            Connection old=_connection;
                            _connection=next;
                            _manager.endPointUpgraded(this,old);
                            continue;
                        }
                        break;
                    }
                }
                catch (ClosedChannelException e)
                {
                    LOG.ignore(e);
                }
                catch (EofException e)
                {
                    LOG.debug("EOF", e);
                    try{close();}
                    catch(IOException e2){LOG.ignore(e2);}
                }
                catch (IOException e)
                {
                    LOG.warn(e.toString());
                    try{close();}
                    catch(IOException e2){LOG.ignore(e2);}
                }
                catch (Throwable e)
                {
                    LOG.warn("handle failed", e);
                    try{close();}
                    catch(IOException e2){LOG.ignore(e2);}
                }
                finally
                {
                    if (!_ishut && isInputShutdown() && isOpen())
                    {
                        _ishut=true;
                        try
                        {
                            _connection.onInputShutdown();
                        }
                        catch(Throwable x)
                        {
                            LOG.warn("onInputShutdown failed", x);
                            try{close();}
                            catch(IOException e2){LOG.ignore(e2);}
                        }
                        finally
                        {
                            updateKey();
                        }
                    }
                    dispatched=!undispatch();
                }
            }
        }
        finally
        {
            if (dispatched)
            {
                dispatched=!undispatch();
                while (dispatched)
                {
                    LOG.warn("SCEP.run() finally DISPATCHED");
                    dispatched=!undispatch();
                }
            }
        }
    }

由此可以大致总结出Jetty有关NIO使用的模式,如下图所示:

1).监听并建立连接

2).监听客户端请求

 

3).请求的处理

最核心就是把三件不同的事情隔离开,并用不同规模的线程去处理,最大限度地利用NIO的异步和通知特性。 

 

 

1.4 启动jetty服务,观察容器线程池

 

 

 

 

 

 

参考博客:

 

 

 

 

  • 大小: 27.3 KB
  • 大小: 261.1 KB
  • 大小: 8.2 KB
  • 大小: 11.2 KB
  • 大小: 29 KB
分享到:
评论

相关推荐

    从Jetty、Tomcat和Mina中提炼NIO构架网络服务器的经典模式

    首先,Jetty的NIO实现主要由以下几个关键组件组成: 1. **SelectChannelConnector**:负责整合各个组件,创建和管理连接。 2. **SelectSet**:监控客户端的请求,使用Selector监听channel队列。 3. **...

    jetty 8及依赖包

    通过分析源代码和实验,你可以了解到如何自定义配置Jetty以适应特定的应用需求,例如调整线程池大小、添加自定义过滤器或者实现WebSocket端点。 总的来说,这个压缩包是一个极好的起点,无论是对Java Web开发初学者...

    jetty-4.2.24

    3. **高并发**:Jetty采用NIO(非阻塞I/O)模型,能够处理大量并发连接,从而提高性能。 4. **模块化**:Jetty的组件设计为可插拔,用户可以根据需要选择和配置必要的模块。 5. **WebSocket支持**:Jetty很早就...

    jetty服务器性能调整过程分析

    Jetty服务器采用了非阻塞I/O(NIO)加线程池的技术方案来实现在高并发场景下的高性能表现。本篇文章的目标是通过调整Jetty服务器的各项配置参数,来观察并评估其对服务器性能的影响,并进一步提炼出一套适用于服务端...

    Jetty中文手册

    如何部署第三方产品 部署展开形式的web应用 使用Jetty进行开发 如何使用Jetty进行开发 如何编写Jetty中的Handlers 使用构建工具 如何在Maven中使用Jetty 如何在Ant中使用Jetty Maven和Ant的更多支持 Jetty Maven插件...

    jetty所需jar包

    7. **jetty-io.jar**:提供了低级别的网络I/O操作,如ByteBuffer和NIO相关的工具。 8. **jetty-webapp.jar**:处理Web应用程序的部署和管理,支持WAR文件的部署和解压。 9. **jetty-jndi.jar**:如果需要JNDI...

    i-jetty源码

    1. **事件驱动**:i-jetty采用非阻塞I/O(NIO)模型,基于Java的Selector API,以事件驱动的方式处理网络通信,提高系统资源利用率。 2. **模块化**:i-jetty的架构高度模块化,每个功能组件都可以独立加载和卸载,...

    jetty-all.jar

    它包括了异步I/O、NIO和线程池等相关实现,使得Jetty能够高效地处理大量的并发连接。 最后,jetty-webapp-9.4.14.v20181114.jar提供了对Web应用程序的支持。它包含了解析和加载WAR文件、管理Web应用上下文...

    jetty-6.1.26.zip

    8. **连接器和适配器**:Jetty提供了多种连接器(如NIO、SelectChannel和HTTP/2)来适应不同的I/O模型。这些连接器和适配器增强了Jetty对不同网络环境的适应性。 9. **安全性**:Jetty提供了安全模块,支持基本的...

    jetty6.1.6-2

    4. **线程模型**:Jetty采用高效的线程模型,如NIO(非阻塞I/O)或EPOLL(在Linux上),以提高并发处理能力,这对于高流量的Web应用至关重要。 5. **WebSocket支持**:如果lib目录包含websocket相关的JAR,例如...

    jetty-6.1.26源码

    通过分析Jetty 6.1.26的源码,开发者可以深入理解Web服务器的工作原理,这对于优化性能、自定义行为或解决特定问题都有极大的帮助。同时,这也有助于开发者更好地过渡到Jetty的更新版本,因为许多基础架构和设计原则...

    jetty包2(lib目录)

    这个目录包含了Jetty自身的核心组件和其他依赖的第三方库,它们对于理解Jetty的工作原理和如何配置环境至关重要。 1. **Jetty核心组件**: `lib`目录下的jar文件包括了Jetty的主要模块,如Jetty HTTP服务器、...

    jetty相关所有jar包

    8. **连接器**: Jetty支持多种类型的连接器,如NIO(非阻塞I/O)和EPOLL(用于Linux系统的高效I/O),以适应不同的操作系统和网络环境。 9. **模块化设计**: Jetty的模块化结构使得只加载需要的组件成为可能,从而...

    jetty源代码下载

    Jetty是一款轻量级、高性能的Java Web服务器和Servlet容器,它被广泛应用于各种规模的项目,从微型嵌入式应用到大型企业级系统。...通过分析Jetty的实现,你可以学习到如何设计和优化高并发、高性能的网络服务。

    jetty 适合jdk1.8用的服务器

    1. **性能优化**:Jetty以其高效的性能而著称,它使用非阻塞I/O模型,如NIO和EPOLL,这使得它在处理大量并发连接时表现出色。 2. **轻量级**:Jetty没有像Tomcat那样的复杂配置,它的设计目标是简洁,因此启动快速,...

    Jetty权威指南.pdf

    &lt;New id="httpConnector" class="org.eclipse.jetty.server.nio.SelectChannelConnector"&gt; &lt;Set name="host"&gt;localhost &lt;Set name="port"&gt;8080 ``` **4.3 org.mortbay.xml.XmlConfiguration** `org....

    jetty 嵌入式开发源码

    - `Connector`: 这部分源码主要关注网络通信,如`HttpConnection`和`SelectChannelConnector`,它们实现了NIO(非阻塞I/O)或者EPOLL(Linux下的高效I/O模型)。 - `Handler`: Jetty提供了多种Handler实现,如`...

    jetty简单启动web服务第二版

    - Jetty使用NIO(非阻塞I/O)模型,能够高效处理大量并发连接。 - 它的线程池设计允许动态调整工作线程数量,以应对不同负载情况。 7. **安全与认证** - Jetty提供了安全模块,可以实现基本的HTTP身份验证,如...

    jetty嵌入式服务器实例大全

    Jetty是Servlet规范的实现者,支持Servlet 3.1及以上版本。你可以通过创建`Server`对象,然后添加`ServletHandler`或`ServletContextHandler`来注册Servlet。例如,使用`addServlet()`方法指定Servlet类和映射路径...

    jetty-distribution-9.1.0.v20131115

    3. **高性能**:Jetty使用NIO(非阻塞I/O)模型,提高了并发处理能力,适用于高流量的Web应用。 4. **模块化**:Jetty的组件设计允许用户只选择需要的部分,避免了不必要的资源消耗。 5. **兼容性**:Jetty完全支持...

Global site tag (gtag.js) - Google Analytics