`
nod0620
  • 浏览: 20006 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

tomcat session复制(二)

阅读更多

      上文http://nod0620.iteye.com/admin/blogs/1030398 写了session复制的发送部分,继续接收部分:

当接收方tomcat接收到需要session的消息时,最终调用了GroupChannel的messageReceived()方法

    public void messageReceived(ChannelMessage msg) {
        if ( msg == null ) return;
        try {
            if ( Logs.MESSAGES.isTraceEnabled() ) {
                Logs.MESSAGES.trace("GroupChannel - Received msg:" + new UniqueId(msg.getUniqueId()) + " at " +new java.sql.Timestamp(System.currentTimeMillis())+ " from "+msg.getAddress().getName());
            }

            Serializable fwd = null;
            if ( (msg.getOptions() & SEND_OPTIONS_BYTE_MESSAGE) == SEND_OPTIONS_BYTE_MESSAGE ) {
                fwd = new ByteMessage(msg.getMessage().getBytes());
            } else {
                try {
                    fwd = XByteBuffer.deserialize(msg.getMessage().getBytesDirect(), 0, msg.getMessage().getLength());
                }catch (Exception sx) {
                    log.error("Unable to deserialize message:"+msg,sx);
                    return;
                }
            }
            if ( Logs.MESSAGES.isTraceEnabled() ) {
                Logs.MESSAGES.trace("GroupChannel - Receive Message:" + new UniqueId(msg.getUniqueId()) + " is " +fwd);
            }

            //get the actual member with the correct alive time
            Member source = msg.getAddress();
            boolean rx = false;
            boolean delivered = false;
            for ( int i=0; i<channelListeners.size(); i++ ) {
                ChannelListener channelListener = (ChannelListener)channelListeners.get(i);
                if (channelListener != null && channelListener.accept(fwd, source)) {
                    channelListener.messageReceived(fwd, source);
                    delivered = true;
                    //if the message was accepted by an RPC channel, that channel
                    //is responsible for returning the reply, otherwise we send an absence reply
                    if ( channelListener instanceof RpcChannel ) rx = true;
                }
            }//for
            if ((!rx) && (fwd instanceof RpcMessage)) {
                //if we have a message that requires a response,
                //but none was given, send back an immediate one
                sendNoRpcChannelReply((RpcMessage)fwd,source);
            }
            if ( Logs.MESSAGES.isTraceEnabled() ) {
                Logs.MESSAGES.trace("GroupChannel delivered["+delivered+"] id:"+new UniqueId(msg.getUniqueId()));
            }

        } catch ( Exception x ) {
            //this could be the channel listener throwing an exception, we should log it 
            //as a warning.
            if ( log.isWarnEnabled() ) log.warn("Error receiving message:",x);
            throw new RemoteProcessException("Exception:"+x.getMessage(),x);
        }
    }

    可以看到,这里主要是调用ChannelListener的messageReceived方法,显然这个实现类就是SimpleTcpCluster:

 

 public void messageReceived(Serializable message, Member sender) {
        ClusterMessage fwd = (ClusterMessage)message;
        fwd.setAddress(sender);
        messageReceived(fwd);
    }

    public void messageReceived(ClusterMessage message) {

        long start = 0;
        if (log.isDebugEnabled() && message != null)
            log.debug("Assuming clocks are synched: Replication for "
                    + message.getUniqueId() + " took="
                    + (System.currentTimeMillis() - (message).getTimestamp())
                    + " ms.");

        //invoke all the listeners
        boolean accepted = false;
        if (message != null) {
            for (Iterator iter = clusterListeners.iterator(); iter.hasNext();) {
                ClusterListener listener = (ClusterListener) iter.next();
                if (listener.accept(message)) {
                    accepted = true;
                    listener.messageReceived(message);
                }
            }
        }
        if (!accepted && log.isDebugEnabled()) {
            if (notifyLifecycleListenerOnFailure) {
                Member dest = message.getAddress();
                // Notify our interested LifecycleListeners
                lifecycle.fireLifecycleEvent(RECEIVE_MESSAGE_FAILURE_EVENT,
                        new SendMessageData(message, dest, null));
            }
            log.debug("Message " + message.toString() + " from type "
                    + message.getClass().getName()
                    + " transfered but no listener registered");
        }
        return;
    }

   其实这里调用的是ClusterListener的messageReceived()方法,默认的两个ClusterListener是ClusterSessionListener和JvmRouteSessionIDBinderListener,JvmRouteSessionIDBinderListener只有在发送的消息是关于session  id改变的消息时才起作用,重点看ClusterSessionListener.messageReceived()方法,其实是调用到了DeltaManager的messageDataReceived()方法:

    public void messageDataReceived(ClusterMessage cmsg) {
        if (cmsg != null && cmsg instanceof SessionMessage) {
            SessionMessage msg = (SessionMessage) cmsg;
            switch (msg.getEventType()) {
                case SessionMessage.EVT_GET_ALL_SESSIONS:
                case SessionMessage.EVT_SESSION_CREATED: 
                case SessionMessage.EVT_SESSION_EXPIRED: 
                case SessionMessage.EVT_SESSION_ACCESSED:
                case SessionMessage.EVT_SESSION_DELTA:
                case SessionMessage.EVT_CHANGE_SESSION_ID: {
                    synchronized(receivedMessageQueue) {
                        if(receiverQueue) {
                            receivedMessageQueue.add(msg);
                            return ;
                        }
                    }
                   break;
                }
                default: {
                    //we didn't queue, do nothing
                    break;
                }
            } //switch
            
            messageReceived(msg, msg.getAddress() != null ? (Member) msg.getAddress() : null);
        }
    }
 

     这里的事件是EVT_GET_ALL_SESSIONS,在messageReceived()方法中有这个分支的处理代码,最后面调用

handleGET_ALL_SESSIONS()方法:

 protected void handleGET_ALL_SESSIONS(SessionMessage msg, Member sender) throws IOException {
        counterReceive_EVT_GET_ALL_SESSIONS++;
        //get a list of all the session from this manager
        if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.unloadingBegin", getName()));
        // Write the number of active sessions, followed by the details
        // get all sessions and serialize without sync
        Session[] currentSessions = findSessions();
        long findSessionTimestamp = System.currentTimeMillis() ;
        if (isSendAllSessions()) {
            sendSessions(sender, currentSessions, findSessionTimestamp);
        } else {
            // send session at blocks
            for (int i = 0; i < currentSessions.length; i += getSendAllSessionsSize()) {
                int len = i + getSendAllSessionsSize() > currentSessions.length ? currentSessions.length - i : getSendAllSessionsSize();
                Session[] sendSessions = new Session[len];
                System.arraycopy(currentSessions, i, sendSessions, 0, len);
                sendSessions(sender, sendSessions,findSessionTimestamp);
                if (getSendAllSessionsWaitTime() > 0) {
                    try {
                        Thread.sleep(getSendAllSessionsWaitTime());
                    } catch (Exception sleep) {
                    }
                }//end if
            }//for
        }//end if
        
        SessionMessage newmsg = new SessionMessageImpl(name,SessionMessage.EVT_ALL_SESSION_TRANSFERCOMPLETE, null,"SESSION-STATE-TRANSFERED", "SESSION-STATE-TRANSFERED"+ getName());
        newmsg.setTimestamp(findSessionTimestamp);
        if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.createMessage.allSessionTransfered",getName()));
        counterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE++;
        cluster.send(newmsg, sender);
    }

   主要是先找到所有的session,然后发送所有的session,然后发送一个标志结束的消息表示发送完成。这样第一次tomcat的启动加入集群是拿所用的session就完成了,主要的流程是这样的:

 

发送方:

StandardContext.start()--->SimpleTcpCluster.createManager()--->StandardContext.setManager()--->

DeltaManager.start()--->DeltaManager.getAllClusterSessions()--->SimpleTcpCluster.send()

---->GroupChannel.send()---->ParallelNioSender.sendMessage()--->NioSender.process()

 

接收方:

NioReceiver.listen()--->NioReceiver.readDataFromSocket()---->NioReplicationTask.drainChannel()--->

ListenCallback.messageDataReceived()---->ChannelCoordinator.messageReceived()--->

GroupChannel.messageReceived()--->SimpleTcpCluster.messageReceived()--->

DeltaManager.messageDataReceived()---->DeltaManager.handleGET_ALL_SESSIONS()

 

接收方准备好数据又要发送给发送方,简单来说通信是这样:

发送方发送请求--->接收方接受请求,发送数据,一个是session的数据,一个是session发送完毕的数据--->

发送方收到接收方过来的两种数据

 

 

 

 

 

 

 

分享到:
评论

相关推荐

    一台xp机器实现apache+tomcat session复制.docx

    **Session复制**是通过JK模块在Apache和Tomcat之间传递Session信息,使得用户在集群中的不同Tomcat服务器之间切换时,仍能保持登录状态和其他会话信息。这通常需要在`workers.properties`文件中进行详细配置,并且在...

    tomcat5 session 复制

    【Tomcat5 Session 复制详解】 在分布式环境中,保持用户会话(Session)的一致性是至关重要的。Tomcat5 提供了集群(Cluster)功能来实现 Session 的复制,确保用户在不同服务器间的会话状态能够无缝切换。以下是...

    tomcat cluster 集群 session复制

    而实际情况下,采取Apache 加Tomcat进行负载均衡集群的时候,是可以不用将Session复制到所有的节点里, 比如有六个Tomcat实例 Tomcat1,Tomcat2,Tomcat3,Tomcat4,Tomcat5,Tomcat6 是可以配置成 三组互相复制...

    tomcat集群实现session复制

    在IT领域,特别是Web应用服务器的管理与优化中,Tomcat集群实现Session复制是一个关键的技术点,它确保了高可用性和负载均衡,特别是在处理大量并发请求的场景下。本文将深入探讨这一主题,涵盖其原理、配置方法以及...

    tomcat8实现session共享jar包

    此压缩包为tomcat8利用redis实现session共享所需要的jar包,包含(commons-pool2-2.6.0.jar、jedis-2.9.0.jar、tomcat-redis-session-manager.jar)直接将三个jar包复制Tomcat目录lib下面,在修改conf下context.xml...

    apache tomcat 集群 负责均衡 session复制

    NULL 博文链接:https://xueweiabcok.iteye.com/blog/1841448

    tomcat redis session.rar

    7. **优点**:使用Redis同步Session可以避免Session复制带来的网络开销,提高响应速度,同时增强了系统的可扩展性和可用性。 8. **挑战与注意事项**:需要注意Redis的高可用性(如主从复制、哨兵系统或Cluster),...

    tomcat8+memcached session共享

    标题中的“tomcat8+memcached session共享”指的是在Tomcat 8服务器中利用Memcached进行session共享的技术实践。在分布式系统中,session共享是一个重要的问题,因为用户在访问不同的服务器节点时,需要保持登录状态...

    nginx tomcat集群 session复制

    【Nginx Tomcat集群与Session复制】 在高并发、高可用的Web应用环境中,使用Nginx作为负载均衡器,结合Tomcat集群是常见的架构选择。为了保证用户体验的连续性,Session复制技术用于在多个Tomcat实例之间共享用户...

    TOMCAT+APACHE集成以及session复制

    在IT行业中,尤其是在Web服务器配置和优化领域,TOMCAT和APACHE的集成以及session复制是两个关键概念。本文将详细讲解这两个主题,并提供一个基于实际操作的demo概述。 首先,TOMCAT是一款流行的开源Java Servlet...

    tomcat 做session共享所需jar包压

    在分布式系统中,Session共享是一个常见的需求,尤其是在多个Tomcat服务器之间。这通常是由于负载均衡、高可用性或扩展性的需要。"Tomcat做session共享所需jar包"指的是实现这一功能所需的Java档案(JAR)文件。在这...

    tomcat-session同步所需jar.rar_session集群共享_tomcat session

    **Tomcat Session管理**:Tomcat提供了多种session管理策略,如`org.apache.catalina.ha.session.DeltaManager`和`org.apache.catalina.ha.session.JvmRouteBinderValve`,它们支持集群环境下的session复制。...

    nginx实现多个tomcat7直接session共享所需jar包

    2. 在Tomcat的`web.xml`配置文件中添加Session复制的相关配置,例如,引入Redis Session Manager的jar包,并配置相关的session-store-dir和manager类。 3. 配置Nginx,使用上游服务器块(upstream)定义Tomcat实例,...

    Tomcat5集群中的Session复制

    Tomcat5集群中的Session复制 在Tomcat5集群中,Session复制是一个非常重要的概念。Session复制是指在多个服务器节点之间复制和同步Session数据,以确保在服务器故障或重启时,Session数据不丢失。这种机制可以提供...

    tomcat-session共享

    - 可以设置Session复制策略,例如只有在更新Session时才写入Redis,以减少网络开销。 总结来说,实现“tomcat-session共享”涉及到对Nginx负载均衡策略的理解,以及如何利用Redis这样的外部存储来实现跨服务器的...

    linux下用memcache做tomcat集群session复制

    在Linux环境下,使用Memcached实现Tomcat集群的session复制是一个常见的解决方案,以提高应用的可扩展性和高可用性。Memcached是一种分布式内存对象缓存系统,它可以存储包括session在内的临时数据,使得多个服务器...

    apache,tomcat负载均衡和session复制

    当我们谈论"Apache,tomcat负载均衡和session复制"时,这意味着我们要探讨如何在多台服务器之间分配负载,并确保用户会话的无缝迁移和一致性。 **负载均衡**是解决高并发、高可用性问题的重要策略。它通过将来自...

    tomcat-redis-session-manager的jar包-包含Tomcat7和Tomcat8

    此外,为了确保session数据的安全性和一致性,Tomcat-Redis-Session-Manager提供了多种策略,如session过期策略、session复制和故障转移。例如,可以设置session的超时时间,当用户长时间无操作时,session将在Redis...

Global site tag (gtag.js) - Google Analytics