`

应用jgroups实现tomcat session的同步

 
阅读更多

项目中采用将会话保存到全局缓存中的方式,实现了分布式session的功能。由于整个系统划分为多个子系统,并且分别部署到不同的服务器节点上,所以各tomcat容器之间的session的注销和过期时间需要同步。目前项目中采用了jgroups的组播方式实现了各容器之间session的同步问题,下面是部分代码,仅供参考:

 

import org.apache.catalina.Manager;
import org.apache.catalina.Session;
import org.apache.catalina.session.CacheConstants;
import org.apache.catalina.session.CacheSession;
import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;
import org.jgroups.ChannelClosedException;
import org.jgroups.ChannelException;
import org.jgroups.ChannelNotConnectedException;
import org.jgroups.ExtendedReceiverAdapter;
import org.jgroups.JChannel;
import org.jgroups.Message;
import org.jgroups.View;
import org.jgroups.util.Util;


public class CacheMsgManager extends ExtendedReceiverAdapter{
	protected Log logger = LogFactory.getLog(CacheMsgManager.class);
	
	private static Manager manager = null;
	private static CacheMsgManager msgManager = null;
	private static String PROTOCOL_STACK = "";
	private static final String SESSION_GROUP = "sessionid-jgroup";
	private static JChannel channel;
	private List<String> msgList = new ArrayList<String>();//模拟状态对象,保存的是本节点的收到的消息
	
	
	public static CacheMsgManager getInstance(String ip){
		if(msgManager == null)
			msgManager = new CacheMsgManager(ip);
		
		return msgManager;
	}
	
	private CacheMsgManager(String ip){
		PROTOCOL_STACK = "UDP(mcast_addr="+ ip +";mcast_port=32767;ip_ttl=3;"+
			"mcast_send_buf_size=150000;mcast_recv_buf_size=80000):"+
			"PING(timeout=2000;num_initial_members=3):"+
			"MERGE2(min_interval=5000;max_interval=10000):"+
			"FD_SOCK:"+
			"VERIFY_SUSPECT(timeout=1500):"+
			"pbcast.NAKACK(gc_lag=50;retransmit_timeout=300,600,1200,2400,4800):"+
			"pbcast.STABLE(desired_avg_gossip=20000):"+
			"UNICAST(timeout=2500,5000):"+
			"FRAG:"+
			"pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;shun=false;print_local_addr=false):"+
			"pbcast.STATE_TRANSFER";
		
		genChannel();
	}
	
	public static JChannel getChannel() {
		return channel;
	}
	
	
	/**
	 * 在有节点向本节点请求状态的时候被调用
	 */
	@Override
	public byte[] getState() {
		byte[] state = null;
		synchronized(msgList){
			try {
				state = Util.objectToByteBuffer(msgList);
			} catch (Exception e) {
				e.printStackTrace();
			}
			return state;
		}
	}
	
	/**
	 * 在其他节点返回状态给本节点的时候被调用
	 */
	@Override
	public void setState(byte[] state) {
		synchronized(msgList){
			try {
				List<String> tmpList = (List<String>)Util.objectFromByteBuffer(state);
				msgList.clear();
				msgList.addAll(tmpList);   
				logger.info("^^^^^^^^^^^^^^^^^^^ receive state:[");
				
				for(String msg : msgList){
					logger.info(msg);
				}
				logger.info("]");
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
	}

	/**
	 * 当有消息进来的时候被调用  
	 */  
	@Override
	public void receive(Message msg) {
		logger.info("^^^^^^^^^^^^^^^^^^^ new message receive from "+ msg.getSrc() +" : "+ msg.getObject());
		//msgList.add(msg.getSrc()+" send : "+ msg.getObject());
		
		String msgBody = "";
		String oper = "";
		
		if(msg != null){
			msgBody = (String)msg.getObject();
			String[] body = msgBody.split(",");
			
			if(body != null && body.length > 0){
				oper = body[0];
				
				String sessionId = null;
				String[] ids = null;
				
				if(body[1] != null){
					ids = body[1].split(":");
					if(ids != null)
						sessionId = ids[1];
				}
				
				logger.info("^^^^^^^^^^^^^^^^^^^ receive() sessionId : "+ sessionId);
				if(oper != null && oper.length() > 0){				
					
					if(oper.equals(CacheConstants.OPER_KEY_EXPIRE)){
						operExpire(sessionId);
					}
					
					if(oper.equals(CacheConstants.OPER_KEY_ACCESS)){
						operAccess(sessionId);
					}
				}
			}			
		}
	}
	
	
	private void genChannel(){
		try{
			if(channel == null){
				channel = new JChannel(PROTOCOL_STACK);//打开channel并指定配置文件
				channel.connect(SESSION_GROUP);//指定组名,就可以创建或连接到广播组
				channel.setReceiver(this);//注册回调接口,使用"推"模式来接受广播信息
				logger.info("^^^^^^^^^^^^^^^^^^^ current view:"+ channel.getView());//查看当前组成员
				channel.getState(null, 1000);//状态同步,第一个参数为null则向协调者节点获取信息
			}
			
		}catch(Exception e){
			logger.error("^^^^^^^^^^^^^^^^^^^ genChannel exception: "+ e.getMessage());
			e.printStackTrace();
		}
	}
	

	/**  
	 * 当组成员发生变化的时候被调用  
	 */  
	@Override
	public void viewAccepted(View new_view) {
		logger.info("^^^^^^^^^^^^^^^^^^^ new view receiver : "+ new_view);
	}

	public void start() throws ChannelException{
		//genChannel();
	}
	
	public void sendMessage(String oper, String id, String key, Object value){
		try {
			if(channel == null)
				genChannel();
			
			StringBuffer msgBuffer = new StringBuffer(oper +",");
			if(id != null && id.trim().length() > 0)
				msgBuffer.append("id:"+ id);
			
			if(key != null && key.trim().length() > 0){
				msgBuffer.append(",");
				msgBuffer.append("key:"+ key);
			}
			
			if(value != null)
				msgBuffer.append(",value:"+ value);//应用层存储到会话中的对象的toString()方法不能用逗号分割属性值			
			
			if(msgBuffer != null && msgBuffer.toString().length() > 0){
				Message message = new Message(null, null, msgBuffer.toString());
				channel.send(message);//发送消息
				//channel.close();
			}
			
		} catch (ChannelNotConnectedException cnce) {
			cnce.printStackTrace();
		} catch (ChannelClosedException cce) {
			cce.printStackTrace();
		}catch (ChannelException ce) {
			ce.printStackTrace();
		}catch (Exception e) {
			e.printStackTrace();
		}
	}	
	
	private void operExpire(String sessionId){
		logger.info("^^^^^^^^^^^^^^^^^^^ Begin msg oper : "+ CacheConstants.OPER_KEY_EXPIRE);
		//logger.info("^^^^^^^^^^^^^^^^^^^ tomcat manager : "+ getManager());
		
		try{
			if(sessionId != null && sessionId.trim().length() > 0){
				Manager manager = getManager();
				Session session = manager.findSession(sessionId);
				if(session != null)
					manager.remove(session);
			}
			
		}catch(Exception e){
			logger.error("^^^^^^^^^^^^^^^^^^^ "+ CacheConstants.OPER_KEY_EXPIRE 
					+" exceptions : "+ e.getMessage());
		}
		
		logger.info("^^^^^^^^^^^^^^^^^^^ End msg oper : "+ CacheConstants.OPER_KEY_EXPIRE);
	}
	
	private void operAccess(String sessionId){
		logger.info("^^^^^^^^^^^^^^^^^^^ Begin msg oper : "+ CacheConstants.OPER_KEY_ACCESS);
		
		try{
			if(sessionId != null && sessionId.trim().length() > 0){
				Manager manager = getManager();
				Session session = manager.findSession(sessionId);
				if(session != null){
					CacheSession mySession = (CacheSession)session;
					logger.info("^^^^^^^^^^^^^^^^^^^ mySession : "+ mySession);
					
					mySession.lastAccessedTime = mySession.thisAccessedTime;
					mySession.thisAccessedTime = System.currentTimeMillis();
			        
			        if (mySession.ACTIVITY_CHECK) {
			        	mySession.accessCount.incrementAndGet();
			        }
				}
			}
			
		}catch(Exception e){
			logger.error("^^^^^^^^^^^^^^^^^^^ "+ CacheConstants.OPER_KEY_ACCESS 
					+" exceptions : "+ e.getMessage());
		}
		
		logger.info("^^^^^^^^^^^^^^^^^^^ End msg oper : "+ CacheConstants.OPER_KEY_ACCESS);
	}
	
	
	public static Manager getManager(){
		return manager;
	}
	public static void setManager(Manager _manager){
		manager = _manager;
	}
}



 

0
0
分享到:
评论

相关推荐

    tomcat-session同步所需jar.rar_session集群共享_tomcat session

    "tomcat-session同步所需jar.rar_session集群共享_tomcat session"这个标题表明我们关注的是如何在Tomcat集群环境中实现session的共享。这涉及到多个知识点,包括session的基本概念、session复制、粘滞会话、以及...

    tomcat cluster 集群 session复制

    一直以来,我误解认为启动了n个tomcat,则Session需要同步复制到n个Tomcat中存在,因此在启动了6个以上的Tomcat,性能会大大下降。 而实际情况下,采取Apache 加Tomcat进行负载均衡集群的时候,是可以不用将Session...

    Tomcat通过自带的Cluster方式实现Session会话共享环境操作记录(个人精华版)

    总结起来,Tomcat的Cluster机制提供了强大的Session会话共享能力,通过合理的配置和使用,可以有效解决多服务器环境下的用户状态同步问题,提升Web应用的可用性和用户体验。在实际操作中,要结合具体业务需求和环境...

    apache的tomcat负载均衡(两个tomcat)和集群配置(session复制)

    在Tomcat集群中,为了保持用户会话在不同服务器间的同步,需要实现session复制。这可以通过以下方式实现: 1. **集群配置** - 在每个Tomcat实例的`server.xml`中,设置`&lt;Engine&gt;`、`&lt;Host&gt;`和`&lt;Cluster&gt;`元素。...

    apache,tomcat负载均衡和session复制

    2. **通信协议**:Apache和Tomcat之间,以及Tomcat服务器之间可能需要使用特定的通信协议来交换session信息,如JGroups或Java RMI。 3. **性能和复杂性**:session复制虽然提供了高可用性和用户体验的一致性,但也会...

    Tomcat 6 多机实现Session复制

    6. **Channel**元素:定义通信机制,例如`SimpleTcpCluster`或`JGroups`,用于服务器间的通信,实现Session数据的实时同步。 7. **Loader**元素:在集群环境中,你可能需要配置Loader来同步应用的类加载器,确保...

    tomcat集群配置文件

    在IT行业中,Tomcat是一个广泛使用的开源Web应用服务器,它实现了Java Servlet和JavaServer Pages(JSP)规范。当我们谈论“tomcat集群配置文件”时,我们指的是将多个Tomcat实例组织成一个集群,以提高应用程序的...

    Tomcat集群实例下载

    3. **通信机制**:Tomcat实例之间需要进行通信,以便同步session状态和其他关键信息。这可以通过Apache JGroups库或其他类似工具来实现。 4. **故障检测与恢复**:集群系统需要能够检测到实例的故障,并自动将请求...

    tomcat分布式

    3. **通信机制**:在分布式系统中,各个节点之间需要通信,Tomcat支持基于JGroups的集群通信,可以实现节点间的同步和状态共享。 4. **集群配置**:配置Tomcat集群涉及修改server.xml文件,添加`&lt;Cluster&gt;`元素,...

    tomcat集群优化详细配置

    1. **Tomcat自带的Cluster方式**:利用JGroups框架实现节点间的session同步。这种方式简单易用,但效率较低,不适合高并发环境。 2. **DNS轮询**:通过DNS服务器将请求轮流分配给不同的Tomcat实例。优点是配置简单...

    tomcat cluster参考资料3

    Tomcat Cluster是Tomcat服务器的一种高级特性,它允许多个Tomcat实例协同工作,共享session状态,并在节点间分散请求,从而实现负载均衡和故障转移。本参考资料3将深入探讨Tomcat集群的相关知识点。 【一、Tomcat...

    tomcat集群

    9. **集群通信协议**:Tomcat使用JGroups库进行集群内的通信,确保实例间的信息同步和会话复制。 10. **虚拟主机**:在集群环境中,可以配置多个虚拟主机,使得不同的域名或应用可以运行在同一组Tomcat实例上,充分...

    tomcat集群与会话共享

    在Tomcat集群中,服务器之间的通信通常依赖于Java的JGroups库,它提供了一套可靠的组通信协议,用于成员发现、消息传递和故障检测。 **5. 配置注意事项** - 为了保证会话复制的正确性,应用中的会话对象需要序列化...

    tomcat集群部署.

    在Tomcat集群中,多台Tomcat服务器共享相同的Web应用,能够互相备份,实现负载均衡和故障转移。 2. **集群的优点**: - **负载均衡**:通过将请求分散到多个服务器上,防止单一服务器过载。 - **高可用性**:如果...

    apache tomcat集群

    在Tomcat中,集群可以通过网络通信来同步session数据。 2. **配置集群**:要设置Apache Tomcat集群,首先需要在`server.xml`配置文件中定义每个Tomcat实例的`Engine`、`Host`和`Cluster`元素。`Cluster`元素用于...

    Tomcat集群资料

    在Java Web应用服务器领域,Apache Tomcat以其轻量级、高效和开源的特点,被广泛用于部署Servlet和JSP应用。然而,随着业务的增长,单台Tomcat服务器可能无法满足高并发和高可用性的需求,这时我们就需要引入集群...

    Tomcat 集群负载均衡

    JGroups是用于组播和集群间通信的库,它负责节点间的同步和消息传递。 ```xml ... &lt;Valve className="org.apache.catalina.ha.session.DeltaRequestValve"/&gt; &lt;Valve className="org.apache.catalina.ha....

    tomcat集群部署

    2. **基于内存的session复制**:使用TCP或JGroups等协议,在节点之间直接交换session数据。 3. **使用应用级解决方案**:如Spring Session或Redis,提供更高效、可靠的session共享和管理。 **五、监控与维护** 1. ...

    有图有真相。windows环境下配置tomat+redis+nginx集群共享session

    - Tomcat使用`JGroups`库进行集群间的通信,确保Session更新同步。 - 需要在`server.xml`的`Channel`元素中配置`JGroups`的配置文件(如`jgroups.xml`)。 6. **Nginx负载均衡策略**: - Nginx支持多种负载均衡...

    Apache2负载均衡+Tomcat6集群

    2. **通信机制**:Tomcat集群使用一种称为`JGroups`的协议来实现节点间的通信,确保session数据的一致性。JGroups提供多种传输层协议,如UDP、TCP等,可以根据网络环境选择。 3. **session复制**:每个Tomcat实例都...

Global site tag (gtag.js) - Google Analytics