`
nod0620
  • 浏览: 20003 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

tomcat session复制(一)

阅读更多

 tomcat的session复制大致分两种:all-to-all和backup,先看all-to-all,主要是记录下自己读源代码的心得和代码流程

 

 tomcat集群配置暂时略过。

     在tomcat的启动过程中,找到seesion复制的入口,在StandardContext的start()方法中:

 

            Manager contextManager = null;
                if (manager == null) {
                    if ( (getCluster() != null) && distributable) {
                        try {
                            contextManager = getCluster().createManager(getName());
                        } catch (Exception ex) {
                            log.error("standardContext.clusterFail", ex);
                            ok = false;
                        }
                    } else {
                        contextManager = new StandardManager();
                    }
                } 
                
                // Configure default manager if none was specified
                if (contextManager != null) {
                    setManager(contextManager);
                }

                if (manager!=null && (getCluster() != null) && distributable) {
                    //let the cluster know that there is a context that is distributable
                    //and that it has its own manager
                    getCluster().registerManager(manager);
                }

      如果配置了tomcat的集群,那么 contextManager = getCluster().createManager(getName())就会被调用,返回的Manager的类型是DeltaManager,接着调用setManager(contextManager)方法:

 

    public synchronized void setManager(Manager manager) {

        // Change components if necessary  管理session的manager
        Manager oldManager = this.manager;
        if (oldManager == manager)
            return;
        this.manager = manager;

        // Stop the old component if necessary
        if (started && (oldManager != null) &&
            (oldManager instanceof Lifecycle)) {
            try {
                ((Lifecycle) oldManager).stop();
            } catch (LifecycleException e) {
                log.error("ContainerBase.setManager: stop: ", e);
            }
        }

        // Start the new component if necessary
        if (manager != null)
            manager.setContainer(this);
        if (started && (manager != null) &&
            (manager instanceof Lifecycle)) {
            try {
                ((Lifecycle) manager).start();
            } catch (LifecycleException e) {
                log.error("ContainerBase.setManager: start: ", e);
            }
        }

        // Report this property change to interested listeners
        support.firePropertyChange("manager", oldManager, this.manager);

    }

   看到红色部分,这里调用了DeltaManager的start()方法,也就是启动了DeltaManager,看代码:

 public void start() throws LifecycleException {
        if (!initialized) init();

        // Validate and update our current component state
        if (started) {
            return;
        }
        started = true;
        lifecycle.fireLifecycleEvent(START_EVENT, null);

        // Force initialization of the random number generator
        generateSessionId();

        // Load unloaded sessions, if any
        try {
            //the channel is already running
            Cluster cluster = getCluster() ;
            // stop remove cluster binding
            //wow, how many nested levels of if statements can we have ;)
            if(cluster == null) {
                Container context = getContainer() ;
                if(context != null && context instanceof Context) {
                     Container host = context.getParent() ;
                     if(host != null && host instanceof Host) {
                         cluster = host.getCluster();
                         if(cluster != null && cluster instanceof CatalinaCluster) {
                             setCluster((CatalinaCluster) cluster) ;
                         } else {
                             Container engine = host.getParent() ;
                             if(engine != null && engine instanceof Engine) {
                                 cluster = engine.getCluster();
                                 if(cluster != null && cluster instanceof CatalinaCluster) {
                                     setCluster((CatalinaCluster) cluster) ;
                                 }
                             } else {
                                     cluster = null ;
                             }
                         }
                     }
                }
            }
            if (cluster == null) {
                log.error(sm.getString("deltaManager.noCluster", getName()));
                return;
            } else {
                if (log.isInfoEnabled()) {
                    String type = "unknown" ;
                    if( cluster.getContainer() instanceof Host){
                        type = "Host" ;
                    } else if( cluster.getContainer() instanceof Engine){
                        type = "Engine" ;
                    }
                    log.info(sm.getString("deltaManager.registerCluster", getName(), type, cluster.getClusterName()));
                }
            }
            if (log.isInfoEnabled()) log.info(sm.getString("deltaManager.startClustering", getName()));
            //to survice context reloads, as only a stop/start is called, not
            // createManager
            cluster.registerManager(this);

            //随机找一个节点,然后复制所有的session过来
            getAllClusterSessions();






        } catch (Throwable t) {
            log.error(sm.getString("deltaManager.managerLoad"), t);
        }
    }
 

   首先,如果没有初始化,那么进行初始化,接着设置Cluster,然后调用Cluster的registerManager()方法,注册Manager,这里没有看明白,因为在StandardContext的start()方法中也有这个方法的调用,有重复的嫌疑?

   最重要的是调用getAllClusterSessions()方法,这个方法的作用是,在集群中取到第一台tomcat的member,发送消息要求获得对方的所有session数据,对方收到消息,进行处理发送所有数据,这台tomcat收到数据,之后发送数据告知已经收到数据,双方通信结束。看代码:

 

    public synchronized void getAllClusterSessions() {
        if (cluster != null && cluster.getMembers().length > 0) {
            long beforeSendTime = System.currentTimeMillis();
            Member mbr = findSessionMasterMember();





            if(mbr == null) { // No domain member found
                 return;
            }
            SessionMessage msg = new SessionMessageImpl(this.getName(),SessionMessage.EVT_GET_ALL_SESSIONS, null, "GET-ALL","GET-ALL-" + getName());
            // set reference time
            stateTransferCreateSendTime = beforeSendTime ;
            // request session state
            counterSend_EVT_GET_ALL_SESSIONS++;
            stateTransfered = false ;
            // FIXME This send call block the deploy thread, when sender waitForAck is enabled
            try {
                synchronized(receivedMessageQueue) {
                     receiverQueue = true ;
                }
                cluster.send(msg, mbr);





                if (log.isWarnEnabled()) log.warn(sm.getString("deltaManager.waitForSessionState",getName(), mbr,getStateTransferTimeout()));
                // FIXME At sender ack mode this method check only the state transfer and resend is a problem!
                waitForSendAllSessions(beforeSendTime);





            } finally {
                synchronized(receivedMessageQueue) {
                    for (Iterator iter = receivedMessageQueue.iterator(); iter.hasNext();) {
                        SessionMessage smsg = (SessionMessage) iter.next();
                        if (!stateTimestampDrop) {
                            messageReceived(smsg, smsg.getAddress() != null ? (Member) smsg.getAddress() : null);
                        } else {
                            if (smsg.getEventType() != SessionMessage.EVT_GET_ALL_SESSIONS && smsg.getTimestamp() >= stateTransferCreateSendTime) {
                                // FIXME handle EVT_GET_ALL_SESSIONS later
                                messageReceived(smsg,smsg.getAddress() != null ? (Member) smsg.getAddress() : null);
                            } else {
                                if (log.isWarnEnabled()) {
                                    log.warn(sm.getString("deltaManager.dropMessage",getName(), smsg.getEventTypeString(),new Date(stateTransferCreateSendTime), new Date(smsg.getTimestamp())));
                                }
                            }
                        }
                    }        
                    receivedMessageQueue.clear();
                    receiverQueue = false ;
                }
           }
        } else {
            if (log.isInfoEnabled()) log.info(sm.getString("deltaManager.noMembers", getName()));
        }
    }

      首先,需要找到集群当中的一个tomcat,因为all-to-all复制中所有的tomcat都是对等的,所以第一个就可以了.

之后,生成一个EVT_GET_ALL_SESSIONS的消息,调用Manager的属性cluster的send()给集群中的tomcat发送要求得到所有session的消息,这个过程是异步的,所以在下面有waitForSendAllSessions()方法的调用,如果在60s内集群里面的tomcat没有给这个tomcat回应,那么这个tomcat就timeout,不能加入集群了。

 

      我们先看SimpleTcpCluster的send()方法:

 

    public void send(ClusterMessage msg, Member dest) {
        try {
            msg.setAddress(getLocalMember());
            if (dest != null) {
                if (!getLocalMember().equals(dest)) {
                    channel.send(new Member[] {dest}, msg,channelSendOptions);





                } else
                    log.error("Unable to send message to local member " + msg);
            } else {
                if (channel.getMembers().length>0)
                    channel.send(channel.getMembers(),msg,channelSendOptions);
                else if (log.isDebugEnabled()) 
                    log.debug("No members in cluster, ignoring message:"+msg);
            }
        } catch (Exception x) {
            log.error("Unable to send message through cluster sender.", x);
        }
    }

   实际调用的是GroupChannel的send()方法,另外发送数据的类别是异步的(SEND_OPTIONS_ASYNCHRONOUS)

  接着调用的是ChannelInterceptor的sendMessage()方法,这是个责任链模式的应用,其实默认的ChannelInterceptor只有两个:MessageDispatch15Interceptor和TcpFailureDetector。MessageDispatch15Interceptor只是在开了另外一个线程后,在一定条件下在该线程中进行sendMessage()方法调用。TcpFailureDetector的作用是处理sendMessage()方法出异常后对应的集群中的Member。最后面调用的ChannelCoordinator的sendMessage()方法:

    public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException {
        if ( destination == null ) destination = membershipService.getMembers();
        clusterSender.sendMessage(msg,destination);
        if ( Logs.MESSAGES.isTraceEnabled() ) {
            Logs.MESSAGES.trace("ChannelCoordinator - Sent msg:" + new UniqueId(msg.getUniqueId()) + " at " +new java.sql.Timestamp(System.currentTimeMillis())+ " to "+Arrays.toNameString(destination));
        }
    }
 

 其实调用的是ParallelNioSender的sendMessage()方法,最终调用的是NioSender类进行底层的处理。到这里刚刚启动的tomcat的请求已经完成,这个时候在集群的中的一台tomcat收到消息,进行相应处理,接收消息的类是NioReceiver

 

让我们转换思想,转换到接收方进行考虑:

 

NioReceiver在前面的博文中已经详细讲过,其实他就是个后台线程,在不断的跑。在有消息进来,接收到后,每个消息产生一个Runnable任务,然后交给线程池处理。看Runnable任务的NioReplicationTask的drainChannel()方法中,有一段代码:

 //process the message  ReceiverBase.messageDataReceived()
                getCallback().messageDataReceived(msgs[i]);

 调用ListenCallback的方法,如下:

    public void messageDataReceived(ChannelMessage data) {
        if ( this.listener != null ) {
            if ( listener.accept(data) ) listener.messageReceived(data);
        }
    }
 

    这里的listener的实现类是:ChannelCoordinator,看代码是调用的父类的ChannelInterceptorBase的方法:

 

   public void messageReceived(ChannelMessage msg) {
        if (getPrevious() != null) getPrevious().messageReceived(msg);
    }
 

   这里的ChannelInterceptor还是前面说到的那两个,无关紧要,最终调用了GroupChannel的messageReceived()方法,具体的下一篇文章再说

 

 

 

 

分享到:
评论

相关推荐

    一台xp机器实现apache+tomcat session复制.docx

    【Apache + Tomcat Session 复制】:在一台运行Windows XP的计算机上,要实现Apache HTTP Server和Tomcat的负载均衡以及Session复制,通常需要使用Apache的mod_jk模块。这个过程涉及到Apache、Tomcat的配置,以及mod...

    tomcat5 session 复制

    【Tomcat5 Session 复制详解】 在分布式环境中,保持用户会话(Session)的一致性是至关重要的。Tomcat5 提供了集群(Cluster)功能来实现 Session 的复制,确保用户在不同服务器间的会话状态能够无缝切换。以下是...

    tomcat cluster 集群 session复制

    而实际情况下,采取Apache 加Tomcat进行负载均衡集群的时候,是可以不用将Session复制到所有的节点里, 比如有六个Tomcat实例 Tomcat1,Tomcat2,Tomcat3,Tomcat4,Tomcat5,Tomcat6 是可以配置成 三组互相复制...

    tomcat集群实现session复制

    在IT领域,特别是Web应用服务器的管理与优化中,Tomcat集群实现Session复制是一个关键的技术点,它确保了高可用性和负载均衡,特别是在处理大量并发请求的场景下。本文将深入探讨这一主题,涵盖其原理、配置方法以及...

    tomcat8实现session共享jar包

    此压缩包为tomcat8利用redis实现session共享所需要的jar包,包含(commons-pool2-2.6.0.jar、jedis-2.9.0.jar、tomcat-redis-session-manager.jar)直接将三个jar包复制Tomcat目录lib下面,在修改conf下context.xml...

    apache tomcat 集群 负责均衡 session复制

    NULL 博文链接:https://xueweiabcok.iteye.com/blog/1841448

    tomcat8+memcached session共享

    描述中提到的“nginx+tomcat8+memcached session共享所需jar包,直接放到tomcat/lib下即可”,暗示了实现这一功能需要一些特定的Java库(JAR包)。这些JAR包将集成到Tomcat的运行环境中,使Tomcat能够与Memcached...

    TOMCAT+APACHE集成以及session复制

    在这个"负载均衡和session复制"的压缩包中,可能包含了一个示例配置文件和指南,演示了如何配置TOMCAT和APACHE以实现负载均衡和session复制。这可能包括了Apache的虚拟主机配置、mod_proxy模块的启用、TOMCAT集群的...

    tomcat redis session.rar

    7. **优点**:使用Redis同步Session可以避免Session复制带来的网络开销,提高响应速度,同时增强了系统的可扩展性和可用性。 8. **挑战与注意事项**:需要注意Redis的高可用性(如主从复制、哨兵系统或Cluster),...

    tomcat 做session共享所需jar包压

    在分布式系统中,Session共享是一个常见的需求,尤其是在多个Tomcat服务器之间。这通常是由于负载均衡、高可用性或扩展性的需要。"Tomcat做session共享所需jar包"指的是实现这一功能所需的Java档案(JAR)文件。在这...

    tomcat-session同步所需jar.rar_session集群共享_tomcat session

    **Tomcat Session管理**:Tomcat提供了多种session管理策略,如`org.apache.catalina.ha.session.DeltaManager`和`org.apache.catalina.ha.session.JvmRouteBinderValve`,它们支持集群环境下的session复制。...

    nginx实现多个tomcat7直接session共享所需jar包

    2. 在Tomcat的`web.xml`配置文件中添加Session复制的相关配置,例如,引入Redis Session Manager的jar包,并配置相关的session-store-dir和manager类。 3. 配置Nginx,使用上游服务器块(upstream)定义Tomcat实例,...

    Tomcat5集群中的Session复制

    在Tomcat5集群中,Session复制是一个非常重要的概念。Session复制是指在多个服务器节点之间复制和同步Session数据,以确保在服务器故障或重启时,Session数据不丢失。这种机制可以提供高可靠性、可扩展性和高性能的...

    nginx tomcat集群 session复制

    【Nginx Tomcat集群与Session复制】 在高并发、高可用的Web应用环境中,使用Nginx作为负载均衡器,结合Tomcat集群是常见的架构选择。为了保证用户体验的连续性,Session复制技术用于在多个Tomcat实例之间共享用户...

    tomcat-session共享

    - 可以设置Session复制策略,例如只有在更新Session时才写入Redis,以减少网络开销。 总结来说,实现“tomcat-session共享”涉及到对Nginx负载均衡策略的理解,以及如何利用Redis这样的外部存储来实现跨服务器的...

    linux下用memcache做tomcat集群session复制

    在Linux环境下,使用Memcached实现Tomcat集群的session复制是一个常见的解决方案,以提高应用的可扩展性和高可用性。Memcached是一种分布式内存对象缓存系统,它可以存储包括session在内的临时数据,使得多个服务器...

    apache,tomcat负载均衡和session复制

    在多台Tomcat服务器之间进行session复制,意味着当一个用户在某台服务器上创建或更新session时,该session会被复制并分发到其他服务器,确保用户在集群中的任何一台服务器上都能访问到相同的状态信息。 1. **复制...

    tomcat-redis-session-manager的jar包-包含Tomcat7和Tomcat8

    此外,为了确保session数据的安全性和一致性,Tomcat-Redis-Session-Manager提供了多种策略,如session过期策略、session复制和故障转移。例如,可以设置session的超时时间,当用户长时间无操作时,session将在Redis...

Global site tag (gtag.js) - Google Analytics