`
jzkangta
  • 浏览: 160503 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

利用JGroups同步两台server之间的cache(转)

阅读更多
原文地址:http://www.blogjava.net/swingboat/archive/2007/07/16/130565.html

一、需求
前段时间做了一个项目,在后台有很多的数据都放入到了cache中了,而且还会对cache中的数据进行更新。如果只有一台server 没有任何问题,但是如果考虑到集群负载平衡,连接多个server的时候,就有问题出现了,怎么样才能保证多个server之间cache的同步呢?

二、引入JGroups
JGroups是一个可靠的组间通讯工具,进程可以加入一个通讯组,给组内所有的成员或单独的成员发送消息,同样,也可以从组中的成员处接收消息。
系统会记录组的每一个成员,在新成员加入或是现有的成员离开或是崩溃时,会通知组内的其他成员。

当我们更新一台server上的cache的时候,利用JGroups进行广播,其他的server接收到广播,根据接收到的信息来更新自己的cache,这样达到了
每个server的cache同步。

三、实现
1、定义一个接口BaseCache规定出对cache类操作的方法
public interface BaseCache {
    
    public void put(String key, Object ob);
    public Object get(String key);
    
    public void delete(String key);
    public void batchDelete(String[] list);
    public void batchDelete(List list);
    public void deleteAll();
    
}

2、定义一个同步器(CacheSynchronizer),这个类利用JGroups进行发送广播和接收广播
public class CacheSynchronizer {
    private static String protocolStackString=
        "UDP(mcast_addr=235.11.17.19;mcast_port=32767;ip_ttl=3;"+
        "mcast_send_buf_size=150000;mcast_recv_buf_size=80000):"+
        "PING(timeout=2000;num_initial_members=3):"+
        "MERGE2(min_interval=5000;max_interval=10000):"+
        "FD_SOCK:"+
        "VERIFY_SUSPECT(timeout=1500):"+
        "pbcast.NAKACK(gc_lag=50;retransmit_timeout=300,600,1200,2400,4800):"+
        "pbcast.STABLE(desired_avg_gossip=20000):"+
        "UNICAST(timeout=2500,5000):"+
        "FRAG:"+
        "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;shun=false;print_local_addr=false)";
    private static String groupName="YHPTEST";
    
    private Channel jgroupsChannel=null;
    
    //inner class ,定义接收广播,已经对接收到的广播进行处理
    private class ReceiveCallback extends ExtendedReceiverAdapter{
        private BaseCache cache=null;
        public void setCache(BaseCache baseCache){//设置cache类
            cache=baseCache;
        }
        public void receive(Message msg) {
            if(cache==null) return ;
            String strMsg = (String) msg.getObject();
            if(strMsg!=null&&(!"".equals(strMsg))){
                cache.put(strMsg, strMsg);    //根据接收到的广播,同步cache
            }
        }
    }
    
    private ReceiveCallback recvCallback = null;
    
    public CacheSynchronizer(BaseCache cache) throws Exception{
        jgroupsChannel = new JChannel(protocolStackString);
        recvCallback = new ReceiveCallback();
        recvCallback.setCache(cache);
        jgroupsChannel.setReceiver(recvCallback);
        jgroupsChannel.connect(groupName);
    }
    
    /** *//**
     * 发送广播信息,我们可以自定义广播的格式。
     * 这里简单起见,仅仅发送一个字符串
     * @param sendMsg
     * @throws Exception
     */
    public void sendCacheFlushMessage(String sendMsg)throws Exception {
        jgroupsChannel.send(null, null, sendMsg); //发送广播
        
    }
}

3、定义cache类,调用同步器同步cache
public class TestDataCache implements BaseCache {
    private Map dataCache=null;//保持cache数据
    private CacheSynchronizer cacheSyncer = null; //同步器
    
    //inner class for thread safe.
    private static final class TestDataCacheHold{
        private static TestDataCache  theSingleton=new TestDataCache();        
        public static TestDataCache getSingleton(){
            return theSingleton;
        }
        private TestDataCacheHold(){}
    }
    
    //Prevents to inherit
    private TestDataCache(){
        dataCache=new HashMap();
        createSynchronizer();
    }
    
    public static TestDataCache getInstance(){
        return TestDataCacheHold.getSingleton();
    }
    
    public CacheSynchronizer getSynchronizer(){
        return cacheSyncer;
    }
    
    public int getCacheLength(){
        return dataCache.size();
    }
    
    public void createSynchronizer(){
        try{
            cacheSyncer=new CacheSynchronizer(this);
        }catch(Exception e){
            e.printStackTrace();
        }
    }
    
    public void batchDelete(String[] list) {
        if(list!=null) return ;
        synchronized (dataCache){
            for(int i=0;i<list.length;i++){
                if(list[i].length()>0){
                    dataCache.remove(list[i]);
                }
            }
        }

    }

    public void batchDelete(List list) {
        synchronized (dataCache){
            Iterator itor=list.iterator();
            while(itor.hasNext()){
                String tmpKey=(String)itor.next();
                if(tmpKey.length()>0){
                    dataCache.remove(tmpKey);
                }
            }
        }
    }

    public void delete(String key) {
        synchronized (dataCache) {
            dataCache.remove(key);
        }
    }

    public void deleteAll() {
        synchronized (dataCache){
            dataCache.clear();
        }

    }

    public Object get(String key) {
        Object theObj=null;
        synchronized (dataCache) {
            theObj =dataCache.get(key);
        }
        return theObj;
    }

    public void put(String key, Object obj) {
        Object theObj=null;
        synchronized (dataCache){
            theObj=dataCache.get(key);
            if(theObj==null){
                dataCache.put(key, obj);
            }else{
                theObj=obj;
            }
        }
    }


4、更新cache,测试是否同步
Scanner    cin=new    Scanner(System.in);
        String input=cin.next().trim();
        TestDataCache cache=TestDataCache.getInstance();
        while(!"q".equalsIgnoreCase(input)){            
            if(!"".equals(input)){
                cache.put(input, input);
                cache.getSynchronizer().sendCacheFlushMessage(input);
            }            
            System.out.println(cache.getCacheLength());
            input=cin.next();

打开两个Eclipse,run此程序,输入测试数据,控制台显示同步后cache的长度。

四、引申
在此实例中,我们为简单起见仅仅考虑了新增对cache的同步,如果是个真正的项目,这显然是不够的。这样我们就必须定义出消息的格式,例如操作的对象,操作的命令等等。根据消息的定义来执行同步数据的操作。

五、附录
Multicast是一种同时像多台机器发送数据的机制。
Multicast使用224.0.0.0 到 239.255.255.255 这段IP来传送数据,这段IP地址是保留的,发送到这上面的数据不会通过你的子网转发。
在RFC-1060中定义了一部分预留的组播地址,使用时应注意不要重复。

一些比较特别的组播地址:(更多内容请查看RFC-1060)
1) 224.0.0.0 这个是保留地址,不会被指定到任何的组播组
2) 224.0.0.1 这个地址在所有的主机上被指定为一个永久组播组,这个地址可以用来找到本地子网内所有的组播主机。
使用ping 224.0.0.1可以查看这些地址

在一个组播中的所有主机使用一个相同的组播地址,它们被称为一个组(Group),组中的成员是动态的,他们可以随时加入或者离开组。每台主机可以同时是多个组的成员,也可以不属于任何一个组。比较特别的是,并不是只有组中的成员才可以给组发送数据。
组分为两种,一种是永久性的,一种是动态的。对于永久性的组,他们拥有一个众所周知的管理IP地址,这个地址不是组中的成员,它是永久的。永久性的组可以拥有任何数量的成员,甚至没有成员。而动态组只有在拥有成员的时候才存在。JGroups使用的就是动态组来实现组播数据的。

六、参考
http://renex.spaces.live.com/blog/cns!93BE33C757C385AE!280.entry?_c=BlogPart
http://puras.iteye.com/blog/81783

七、备注
在linux下,如果要调用JGroups,启动Tomcat时,必须修改catalina.sh,增加下面的参数:
JAVA_OPTS=" -Djava.net.preferIPv4Stack=true  "
请参考下面的链接:
http://weblogs.java.net/blog/dcengija/archive/2006/04/jgroups_demos_o.html
分享到:
评论

相关推荐

    JavaEE源代码 jgroups-2.2.8

    JavaEE源代码 jgroups-2.2.8JavaEE源代码 jgroups-2.2.8JavaEE源代码 jgroups-2.2.8JavaEE源代码 jgroups-2.2.8JavaEE源代码 jgroups-2.2.8JavaEE源代码 jgroups-2.2.8JavaEE源代码 jgroups-2.2.8JavaEE源代码 ...

    Jgroups 教程

    通过这个通道,节点之间可以发送消息、同步状态等。 ##### 2.2 创建通道并加入群集 首先,需要创建一个JGroups通道,并指定配置文件。然后,调用`connect`方法加入到指定的群集中。 ##### 2.3 主事件循环与发送...

    Ehcache通过Jgroups做集群

    测试集群配置,可以启动多台虚拟机或实际的服务器,分别运行配置好的应用,然后观察Ehcache是否能够正确地在集群中同步数据。可以通过添加、更新或删除缓存项,然后检查其他节点是否也同步了这些变更。 在`...

    jgroups-2.2.7.jar

    jgroups-2.2.7.jar jgroups-2.2.7.jar

    jgroups.part1

    jgroups.part1

    JGroups的Raft实现jgroups-raft.zip

    jgroups-raft 项目是 JGroups 框架对 Raft 的实现。Maven:&lt;groupId&gt;org.jgroups &lt;artifactId&gt;jgroups-raft &lt;version&gt;0.2&lt;/version&gt;Raft 是一个容易理解的共识算法。在容错和性能方面它相当于 Paxos(Google 的一致...

    jgroups源代码

    《深入解析JGroups开源框架:基于belaban-JGroups-19d7183源代码》 JGroups是一个用于构建高可用性集群的Java框架,它提供了可靠的消息传递、组成员管理和故障检测等功能,广泛应用于分布式系统中。本文将基于bela...

    jgroups-3.0.2

    JGroups是一个开源的纯java编写的可靠的群组通讯工具。其是一个可靠的组播通讯工具集(需要说明的是,这并不是说...JGroups 适合使用场合服务器集群cluster、多服务器通讯、服务器replication(复制)、分布式cache缓存等

    jgroups

    ### 关于JGroups 2.5教程:安装与开发简易应用程序 #### 安装与配置JGroups **JGroups**是一款高性能、可扩展且高度可靠的群集通信库,旨在为分布式系统提供消息传递功能。本教程将深入探讨如何安装配置JGroups,...

    JGroups-jdk.zip_jgroups

    在本文中,我们将深入探讨JGroups的核心特性、工作原理以及如何利用它来创建可靠的分布式系统。 首先,JGroups的核心功能在于它的组通信能力。一个“组”是由多个成员组成的逻辑集合,每个成员都可以向组内的其他...

    jgroups官方帮助文档html格式打包2.X版本

    此文档主要针对JGroups 2.X版本的官方帮助文档进行详细解读,旨在帮助开发者深入理解并有效地利用JGroups。 一、JGroups简介 JGroups的核心目标是确保在分布式环境中数据的一致性。它提供了一套完整的工具,用于...

    encache+jgroups集群缓存共享

    "encache+jgroups集群缓存共享"这个主题聚焦于如何利用EnCache和JGroups两个技术来实现高效的集群间缓存共享。 EnCache是一个高性能、分布式、内存中的键值存储系统,通常用于缓存应用程序的数据,以减少对数据库的...

    jgroups.part3

    jgroups.part3

    Jgroups中的UNICAST3协议中文翻译

    Jgroups 中的 UNICAST3 协议详解 Jgroups 是一种基于 IP 多播的可靠的组播中间件,UNICAST3 协议是 Jgroups 中的一种单播协议,旨在保持单播和 UNICAST2 的正面特征,而修正负面特征。 UNICAST3 协议的主要特点是...

    jgroups-3.2

    JGroups是一个开源的纯java编写的可靠的群组通讯工具。其是一个可靠的组播通讯工具集(需要说明的是,这并不是说必须要使用IP Multicast,JGroups也可以使用TCP来实现)。其工作模式基于IP多播,但可以在可靠性和群组...

    Ecahche+Jgroups

    Ehcache和JGroups是两个在分布式系统中广泛使用的开源技术。Ehcache是一个高性能、易用的Java本地缓存解决方案,而JGroups则是一个用于组通信的框架,专门处理集群中的节点间通信问题。当这两者结合时,可以构建出...

    Java多播通讯框架 JGroups

    Java多播通讯框架JGroups是Java开发者用于构建高可用、高性能和可伸缩的集群通信系统的重要工具。它提供了一套全面的协议栈,能够处理网络中的节点发现、消息传递、故障检测和恢复等问题,从而使得开发分布式应用变...

Global site tag (gtag.js) - Google Analytics