`

tomcat cluster 源码分析

 
阅读更多

看完tomcat的源码,迫不及待的想研究下tomcat集群,可惜,网络上对于tomcat集群的源码分析比较少(百度,google都找了,基本没有,只有api),基本上都是讲些tomcat集群的配置,以及粗浅的东西,无奈之下,硬着头皮看代码,做个笔记记录下。

 

首先在StandardEngine(ContainerBase).start()方法中,对集群进行启动

        if ((cluster != null) && (cluster instanceof Lifecycle))
            ((Lifecycle) cluster).start();

 下面是集群启动的方法:

    public void start() throws LifecycleException {
        if (started)
            throw new LifecycleException(sm.getString("cluster.alreadyStarted"));
        if (log.isInfoEnabled()) log.info("Cluster is about to start");

        // Notify our interested LifecycleListeners
        lifecycle.fireLifecycleEvent(BEFORE_START_EVENT, this);
        try {
            checkDefaults(); // 检查默认配置
            registerClusterValve();
            channel.addMembershipListener(this);
            channel.addChannelListener(this);
            channel.start(channelStartOptions);
            if (clusterDeployer != null) clusterDeployer.start();
            this.started = true;
            // Notify our interested LifecycleListeners
            lifecycle.fireLifecycleEvent(AFTER_START_EVENT, this);
        } catch (Exception x) {
            log.error("Unable to start cluster.", x);
            throw new LifecycleException(x);
        }
    }

checkDefaults();

    protected void checkDefaults() {
        if ( clusterListeners.size() == 0 ) {
            addClusterListener(new JvmRouteSessionIDBinderListener()); 
            addClusterListener(new ClusterSessionListener());
        }
        if ( valves.size() == 0 ) {
            addValve(new JvmRouteBinderValve());
            addValve(new ReplicationValve());
        }
        if ( clusterDeployer != null ) clusterDeployer.setCluster(this);
        if ( channel == null ) channel = new GroupChannel();
        if ( channel instanceof GroupChannel && !((GroupChannel)channel).getInterceptors().hasNext()) {
            channel.addInterceptor(new MessageDispatch15Interceptor());
            channel.addInterceptor(new TcpFailureDetector());
        }
    }
 

相当于:

<Cluster className="org.apache.catalina.ha.tcp.SimpleTcpCluster" channelSendOptions="6">
	<Manager className="org.apache.catalina.ha.session.BackupManager" expireSessionsOnShutdown="false" notifyListenersOnReplication="true" mapSendOptions="6" />
	<Channel className="org.apache.catalina.tribes.group.GroupChannel">
		<Membership className="org.apache.catalina.tribes.membership.McastService" address="228.0.0.4" port="45564" frequency="500" dropTime="3000" />
		<Receiver className="org.apache.catalina.tribes.transport.nio.NioReceiver" address="auto" port="5000" selectorTimeout="100" maxThreads="6" />
		<Sender className="org.apache.catalina.tribes.transport.ReplicationTransmitter">
			<Transport className="org.apache.catalina.tribes.transport.nio.PooledParallelSender" />
		</Sender>
		<Interceptor className="org.apache.catalina.tribes.group.interceptors.TcpFailureDetector" />
		<Interceptor className="org.apache.catalina.tribes.group.interceptors.MessageDispatch15Interceptor" />
		<Interceptor className="org.apache.catalina.tribes.group.interceptors.ThroughputInterceptor" />
	</Channel>
	<Valve className="org.apache.catalina.ha.tcp.ReplicationValve" filter=".*\.gif;.*\.js;.*\.jpg;.*\.png;.*\.htm;.*\.html;.*\.css;.*\.txt;" />
	<Deployer className="org.apache.catalina.ha.deploy.FarmWarDeployer" tempDir="/tmp/war-temp/" deployDir="/tmp/war-deploy/" watchDir="/tmp/war-listen/" watchEnabled="false" />
	<ClusterListener className="org.apache.catalina.ha.session.ClusterSessionListener" />
</Cluster>

 registerClusterValve();

protected void registerClusterValve() throws Exception {
        if(container != null ) {
            for (Iterator iter = valves.iterator(); iter.hasNext();) {
                ClusterValve valve = (ClusterValve) iter.next();
                if (log.isDebugEnabled())
                    log.debug("Invoking addValve on " + getContainer()
                            + " with class=" + valve.getClass().getName());
                if (valve != null) {

                    // 将value存放到Engine的values中,请求时,通过管道pipeline进行调用。
                    IntrospectionUtils.callMethodN(getContainer(), "addValve",
                            new Object[] { valve },
                            new Class[] { org.apache.catalina.Valve.class });

                }
                valve.setCluster(this);
            }
        }
    }
 

接下来,看看通道的启动:

channel.start(channelStartOptions); channelStartOptions默认为15,二进制位1111.

    public synchronized void start(int svc) throws ChannelException {
        setupDefaultStack();
        if (optionCheck) checkOptionFlags();
        super.start(svc);
        if ( hbthread == null && heartbeat ) {
            hbthread = new HeartbeatThread(this,heartbeatSleeptime);
            hbthread.start();
        }
    }

首先调用父类的start方法,super.start(svc);

一层一层的调用,最后调用

    /**
     * Starts up the channel. This can be called multiple times for individual services to start
     * The svc parameter can be the logical or value of any constants
     * @param svc int value of <BR>
     * DEFAULT - will start all services <BR>
     * MBR_RX_SEQ - starts the membership receiver <BR>
     * MBR_TX_SEQ - starts the membership broadcaster <BR>
     * SND_TX_SEQ - starts the replication transmitter<BR>
     * SND_RX_SEQ - starts the replication receiver<BR>
     * @throws ChannelException if a startup error occurs or the service is already started.
     */
    protected synchronized void internalStart(int svc) throws ChannelException {
        try {
            boolean valid = false;
            //make sure we don't pass down any flags that are unrelated to the bottom layer
            svc = svc & Channel.DEFAULT;

            if (startLevel == Channel.DEFAULT) return; //we have already started up all components
            if (svc == 0 ) return;//nothing to start
            
            if (svc == (svc & startLevel)) throw new ChannelException("Channel already started for level:"+svc);

            //must start the receiver first so that we can coordinate the port it
            //listens to with the local membership settings
            if ( Channel.SND_RX_SEQ==(svc & Channel.SND_RX_SEQ) ) {
                clusterReceiver.setMessageListener(this);
                clusterReceiver.start();
                //synchronize, big time FIXME
                membershipService.setLocalMemberProperties(getClusterReceiver().getHost(), getClusterReceiver().getPort());
                valid = true;
            }
            if ( Channel.SND_TX_SEQ==(svc & Channel.SND_TX_SEQ) ) {
                clusterSender.start();
                valid = true;
            }
            
            if ( Channel.MBR_RX_SEQ==(svc & Channel.MBR_RX_SEQ) ) {
                membershipService.setMembershipListener(this);
                membershipService.start(MembershipService.MBR_RX);
                valid = true;
            }
            if ( Channel.MBR_TX_SEQ==(svc & Channel.MBR_TX_SEQ) ) {
                membershipService.start(MembershipService.MBR_TX);
                valid = true;
            }
            
            if ( !valid) {
                throw new IllegalArgumentException("Invalid start level, valid levels are:SND_RX_SEQ,SND_TX_SEQ,MBR_TX_SEQ,MBR_RX_SEQ");
            }
            startLevel = (startLevel | svc);
        }catch ( ChannelException cx ) {
            throw cx;
        }catch ( Exception x ) {
            throw new ChannelException(x);
        }
    }

上面的代码,英文注释也比较清楚,我就不画蛇添足再做解释了。

接下来就是:

        if ( hbthread == null && heartbeat ) {
            hbthread = new HeartbeatThread(this,heartbeatSleeptime);
            hbthread.start();
        }

启动

        super.heartbeat();
        Iterator i = membershipListeners.iterator();
        while ( i.hasNext() ) {
            Object o = i.next();
            if ( o instanceof Heartbeat ) ((Heartbeat)o).heartbeat();
        }
        i = channelListeners.iterator();
        while ( i.hasNext() ) {
            Object o = i.next();
            if ( o instanceof Heartbeat ) ((Heartbeat)o).heartbeat();
        }

super.heartbeat();

        super.heartbeat();
        checkMembers(false);

checkMembers(boolean checkAll)

    public void checkMembers(boolean checkAll) {
        
        try {
            if (membership == null) setupMembership();
            synchronized (membership) {
                if ( !checkAll ) performBasicCheck();
                else performForcedCheck();
            }
        }catch ( Exception x ) {
            log.warn("Unable to perform heartbeat on the TcpFailureDetector.",x);
        } finally {
            
        }
    }

performBasicCheck()

    protected void performBasicCheck() {
        //update all alive times
        Member[] members = super.getMembers();
        for (int i = 0; members != null && i < members.length; i++) {
            if (membership.memberAlive( (MemberImpl) members[i])) { // 更新当前的节点的最近心跳时间戳lastHeardFrom。
                //we don't have this one in our membership, check to see if he/she is alive
                if (memberAlive(members[i])) {
                    log.warn("Member added, even though we werent notified:" + members[i]);
                    super.memberAdded(members[i]);
                } else {
                    membership.removeMember( (MemberImpl) members[i]);
                } //end if
            } //end if
        } //for

        //check suspect members if they are still alive,
        //if not, simply issue the memberDisappeared message
        MemberImpl[] keys = (MemberImpl[]) removeSuspects.keySet().toArray(new MemberImpl[removeSuspects.size()]);
        for (int i = 0; i < keys.length; i++) {
            MemberImpl m = (MemberImpl) keys[i];
            if (membership.getMember(m) != null && (!memberAlive(m))) {
                membership.removeMember(m);
                super.memberDisappeared(m);
                removeSuspects.remove(m);
                if(log.isInfoEnabled())
                    log.info("Suspect member, confirmed dead.["+m+"]");
            } //end if
        }

        //check add suspects members if they are alive now,
        //if they are, simply issue the memberAdded message
        keys = (MemberImpl[]) addSuspects.keySet().toArray(new MemberImpl[addSuspects.size()]);
        for (int i = 0; i < keys.length; i++) {
            MemberImpl m = (MemberImpl) keys[i];
            if ( membership.getMember(m) == null && (memberAlive(m))) {
                membership.memberAlive(m);
                super.memberAdded(m);
                addSuspects.remove(m);
                if(log.isInfoEnabled())
                    log.info("Suspect member, confirmed alive.["+m+"]");
            } //end if
        }
    }
 

比较乱。。。

 

 

启动进程

Daemon Thread [NioReceiver]

Daemon Thread [Tribes-MembershipReceiver]

Daemon Thread [Tribes-MembershipSender]

Daemon Thread [GroupChannel-Heartbeat-1]

 

总结下:

 

每间隔500毫秒,讲本身的节点信息进行组播。
节点接受组播信息。

发送和接受后,都调用下面的方法:

    protected void checkExpired() {
        synchronized (expiredMutex) {
            // timeToExpiration 默认为3秒
            MemberImpl[] expired = membership.expire(timeToExpiration);
            // 不发送组播报的节点就会被认为是故障,并被从cluster删去
            for (int i = 0; i < expired.length; i++) {
                final MemberImpl member = expired[i];
                if (log.isDebugEnabled())
                    log.debug("Mcast exipre  member " + expired[i]);
                try {
                    // 使用一个线程,将故障节点从集群中移除(移除一个节点,需要经过一系列的过程)
                    Thread t = new Thread() {
                        public void run() {
                            service.memberDisappeared(member);
                        }
                    };
                    t.start();
                } catch (Exception x) {
                    log.error("Unable to process member disappeared message.", x);
                }
            }
        }
    }

获取超过3秒的到期时间。

    /**
     * Runs a refresh cycle and returns a list of members that has expired.
     * This also removes the members from the membership, in such a way that
     * getMembers() = getMembers() - expire()
     * @param maxtime - the max time a member can remain unannounced before it is considered dead.
     * @return the list of expired members
     */
    public synchronized MemberImpl[] expire(long maxtime) {
        if(!hasMembers() )
           return EMPTY_MEMBERS;
       
        ArrayList list = null;
        Iterator i = map.values().iterator();
        while(i.hasNext()) {
            MbrEntry entry = (MbrEntry)i.next();
            if( entry.hasExpired(maxtime) ) {
                if(list == null) // only need a list when members are expired (smaller gc)
                    list = new java.util.ArrayList();
                list.add(entry.getMember());
            }
        }
        
        if(list != null) {
            MemberImpl[] result = new MemberImpl[list.size()];
            list.toArray(result);
            for( int j=0; j<result.length; j++) {
                removeMember(result[j]);
            }
            return result;
        } else {
            return EMPTY_MEMBERS ;
        }
    }

判断时间是否到期

        /**
         * Check if this dude has expired
         * @param maxtime The time threshold
         */
        public boolean hasExpired(long maxtime) {
            long delta = System.currentTimeMillis() - lastHeardFrom;
            return delta > maxtime;
        }

 

 

下面是session的同步,后台Engine启动的一个线程,不间断的跑下面的job,对session进行同步:

下面是addToQueue方法:

    public boolean addToQueue(ChannelMessage msg, Member[] destination, InterceptorPayload payload) {
        final LinkObject obj = new LinkObject(msg,destination,payload);
        Runnable r = new Runnable() {
            public void run() {
                sendAsyncData(obj);
            }
        };
        executor.execute(r);
        return true;
    }

sendAsyncData(obj),异步发送信息:

NIO发送信息:

    private int doLoop(long selectTimeOut, int maxAttempts, boolean waitForAck, ChannelMessage msg) throws IOException, ChannelException {
        int completed = 0;
        int selectedKeys = selector.select(selectTimeOut);
        
        if (selectedKeys == 0) {
            return 0;
        }
        
        Iterator it = selector.selectedKeys().iterator();
        while (it.hasNext()) {
            SelectionKey sk = (SelectionKey) it.next();
            it.remove();
            int readyOps = sk.readyOps();
            sk.interestOps(sk.interestOps() & ~readyOps);
            NioSender sender = (NioSender) sk.attachment();
            try {
                if (sender.process(sk,waitForAck)) {
                    completed++;
                    sender.setComplete(true);
                    if ( Logs.MESSAGES.isTraceEnabled() ) {
                        Logs.MESSAGES.trace("ParallelNioSender - Sent msg:" + new UniqueId(msg.getUniqueId()) + " at " +new java.sql.Timestamp(System.currentTimeMillis())+ " to "+sender.getDestination().getName());
                    }
                    SenderState.getSenderState(sender.getDestination()).setReady();
                }//end if
            } catch (Exception x) {
                SenderState state = SenderState.getSenderState(sender.getDestination());
                int attempt = sender.getAttempt()+1;
                boolean retry = (sender.getAttempt() <= maxAttempts && maxAttempts>0);
                synchronized (state) {
                
                    //sk.cancel();
                    if (state.isSuspect()) state.setFailing();
                    if (state.isReady()) {
                        state.setSuspect();
                        if ( retry ) 
                            log.warn("Member send is failing for:" + sender.getDestination().getName() +" ; Setting to suspect and retrying.");
                        else 
                            log.warn("Member send is failing for:" + sender.getDestination().getName() +" ; Setting to suspect.", x);
                    }                    
                }
                if ( !isConnected() ) {
                    log.warn("Not retrying send for:" + sender.getDestination().getName() + "; Sender is disconnected.");
                    ChannelException cx = new ChannelException("Send failed, and sender is disconnected. Not retrying.",x);
                    cx.addFaultyMember(sender.getDestination(),x);
                    throw cx;
                }
                
                byte[] data = sender.getMessage();
                if ( retry ) {
                    try { 
                        sender.disconnect(); 
                        sender.connect();
                        sender.setAttempt(attempt);
                        sender.setMessage(data);
                    }catch ( Exception ignore){
                        state.setFailing();
                    }
                } else {
                    ChannelException cx = new ChannelException("Send failed, attempt:"+sender.getAttempt()+" max:"+maxAttempts,x);
                    cx.addFaultyMember(sender.getDestination(),x);
                    throw cx;
                }//end if
            }
        }
        return completed;

    }

 

 

 

 

 

 

 

 

 

 

 

上面是在tomcat启动时,会启动的过程,下面还有个在访问的情况下,session同步,还有个步骤:

至此,tomcat的集群源码大致分析完毕。

  • 大小: 128.1 KB
  • 大小: 89.2 KB
  • 大小: 220.2 KB
分享到:
评论
1 楼 yujiaao 2013-11-06  
恰好在做故障分析,受益不少

相关推荐

    tomcat Cluster

    总结来说,Tomcat Cluster是一项关键的高可用性技术,它通过源码级的实现和配置工具,使得多台Tomcat服务器能够协同工作,提高系统的可靠性和响应速度。理解和掌握Tomcat Cluster的原理及配置,对于运维人员来说至关...

    Tomcat8 源码,可以在eclipse中直接运行

    通过在Eclipse集成开发环境中直接运行源码,可以方便地调试和分析Tomcat的执行流程。 首先,我们需要了解Tomcat的基本架构。Tomcat主要由以下几个核心组件构成: 1. **Catalina**:这是Tomcat的核心,负责处理...

    tomcat6源码

    《深入剖析Tomcat6源码》 ...通过对Tomcat6源码的分析,开发者不仅可以提升对Web服务器内部运作的理解,还能学习到如何优化性能、调试问题以及定制化开发。这将对Java Web开发和系统架构设计有着深远的影响。

    Tomcat源码阅读(一)使用Idea运行Tomcat6源码

    一、Tomcat 6源码分析 Tomcat 6的源码结构复杂而有序,主要包括以下几个核心模块: 1. Catalina:这是Tomcat的核心部分,负责处理Servlet和JSP请求。 2. Coyote:负责HTTP/HTTPS协议的解析,是Tomcat与网络通信的...

    Tomcat服务器源码

    在分析源码时,可以通过阅读关键类的初始化过程、方法调用链、以及与服务器配置文件(如server.xml、web.xml)的交互来逐步揭开Tomcat的工作机制。同时,还可以通过阅读Tomcat的文档、参与社区讨论、甚至提交代码...

    tomcat8.0.53源码

    通过阅读和分析Tomcat源码,开发者可以学习到如何实现一个高性能的Web服务器,如何处理并发请求,以及如何遵循Java EE规范来编写可部署的Web应用程序。此外,对于遇到的问题,查看源码往往能帮助找到解决方案,提高...

    tomcat9.0源码

    源码分析: 1. **目录结构**: Tomcat的源码结构清晰,主要包括以下几个关键部分: - `bin`:包含启动和管理Tomcat的脚本。 - `conf`:配置文件存放地,如server.xml,web.xml等。 - `lib`:Tomcat运行所需的...

    tomcat源码基于6.0

    总结,Tomcat 6.0源码分析是一个深入了解Java Web服务器运行机制的过程,这不仅有助于我们理解Web服务器的工作原理,还能帮助我们优化应用性能、排查问题,甚至为开发自定义服务器提供基础。通过深入研究`apache-...

    tomcat源码及相关依赖包

    在分析源码时,我们还需要了解其依赖包的版本和作用。比如,Tomcat可能会依赖特定版本的Servlet和JSP规范,这些规范定义了服务器和客户端之间的通信标准。此外,Tomcat还可能依赖于其他开源库,如Apache Commons库,...

    tomcat及源码

    7. **性能优化**:通过源码分析,找出可能的性能瓶颈,并学习如何调整配置以提高性能。 8. **扩展性**:了解如何编写自定义Valves(请求处理管道中的组件)、Filters和Listeners,以扩展Tomcat的功能。 源码阅读...

    学习tomcat源码+英文《How Tomcat Work》和每一章的相关项目+tomcat6源码依赖jar

    1. **源码结构分析**:了解项目的目录结构和主要模块,比如`catalina`、`coyote`、`jasper`等。 2. **关键类和接口**:深入研究如`StandardServer`、`StandardService`、`ThreadPool`、`ProtocolHandler`等核心类。...

    tomcat-redis-session-manager源码

    通过源码分析,我们可以看到它在设计上的灵活性和实用性,以及在处理高并发场景时的优秀性能。无论是开发者还是运维人员,理解并掌握这套解决方案的内在机制,都能在构建大型Web应用时提供有力支持。

    Tomcat源码apache-tomcat-8.5.47-src.zip

    源码分析对于开发者来说是一种深入理解软件内部运作机制的重要途径。`apache-tomcat-8.5.47-src.zip`这个压缩包包含了Tomcat 8.5.47版本的完整源代码,这对于想要研究Tomcat工作原理、优化性能或者进行自定义扩展的...

    tomcat6.0源码,可直接导入eclipse运行

    通过阅读和分析源码,你可以了解Tomcat如何加载web应用程序,如何处理请求,以及如何管理线程池和连接器。同时,也可以深入研究JSP编译过程、会话管理、安全性和性能优化等方面。 对于Eclipse用户,利用其强大的...

    tomcat:tomcat源码分析

    【标题】:“Tomcat:Tomcat源码分析” 在深入探讨Tomcat源码之前,我们首先需要理解Tomcat是什么。Tomcat是一款开源的Java Servlet容器,由Apache软件基金会开发并维护,它实现了Java Servlet和JavaServer Pages...

    Tomcat 10.0.12的源码

    **Apache Tomcat 10.0.12 源码分析** Apache Tomcat 是一个开源的、基于Java Servlet和Java EE Web应用规范的应用服务器,主要用于处理和运行JSP和Servlet应用程序。源码分析对于理解其工作原理、优化性能以及进行...

    tomcat6.0.18源码 (导入即用)

    源码分析有助于进行性能调优: - **线程池设置**:调整`Connector`的`acceptCount`和`maxThreads`属性。 - **内存管理**:调整JVM参数,如`Xms`、`Xmx`和`NewRatio`。 - **连接器优化**:使用NIO或APR连接器以...

    apache-tomcat源码

    在深入理解Apache Tomcat源码之前,我们首先需要对Java Web开发的基础知识有所了解。 1. **Java Servlet**: Servlet是Java编写的应用程序,它运行在服务器端,处理客户端的HTTP请求,并返回响应。Servlet API定义了...

    tomcat8-source:tomcat源码分析

    本文将对Tomcat8的源码进行深入分析,旨在揭示其内部工作原理。 一、Tomcat架构概述 Tomcat的核心架构基于Catalina组件模型,该模型将Web服务器的功能分解为一系列相互协作的组件。主要组件包括:Coyote(负责处理...

    apache-tomcat-8.5.23-src源码

    源码分析对于理解其内部工作原理、进行定制化开发或调试具有重要意义。 在解压"apache-tomcat-8.5.23-src"后,我们可以看到源码目录结构,主要包括以下几个关键部分: 1. **bin**:这个目录包含了启动和停止Tomcat...

Global site tag (gtag.js) - Google Analytics