`
qq123zhz
  • 浏览: 534513 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

oscache 集群和数据同步

    博客分类:
  • java
 
阅读更多
cache.event.listeners=com.test.JavaGroupsBroadcastingListenerImpl
cache.cluster.properties=UDP(mcast_addr=231.12.21.132;mcast_port=45566;ip_ttl=32;\
mcast_send_buf_size=150000;mcast_recv_buf_size=80000):\
PING(timeout=2000;num_initial_members=3):\
MERGE2(min_interval=5000;max_interval=10000):\
FD_SOCK:VERIFY_SUSPECT(timeout=1500):\
pbcast.NAKACK(gc_lag=50;retransmit_timeout=300,600,1200,2400,4800;max_xmit_size=8192):\
UNICAST(timeout=300,600,1200,2400):\
pbcast.STABLE(desired_avg_gossip=20000):\
FRAG(frag_size=8096;down_thread=false;up_thread=false):\
pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;shun=false;print_local_addr=true)
cache.cluster.multicast.ip=231.12.21.132

 oscache 集群同步数据需要使用到JavaGroupsBroadcastingListener,但是JavaGroupsBroadcastingListener 具体没有实现同步的功能,JavaGroupsBroadcastingListener:

 public void handleNotification(Serializable serializable) {
        if (!(serializable instanceof ClusterNotification)) {
            log.error("An unknown cluster notification message received (class=" + serializable.getClass().getName() + "). Notification ignored.");

            return;
        }

        handleClusterNotification((ClusterNotification) serializable);
    }

 handleNotification该方法调用 handleClusterNotification((ClusterNotification) serializable)后,进入AbstractBoardcasting类,改方法只执行刷新当前缓存的内容,对集群数据同步没有实现。

/*
 * Copyright (c) 2002-2003 by OpenSymphony
 * All rights reserved.
 */
package com.opensymphony.oscache.plugins.clustersupport;

import com.opensymphony.oscache.base.*;
import com.opensymphony.oscache.base.events.*;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import java.util.Date;

/**
 * Implementation of a CacheEntryEventListener. It broadcasts the flush events
 * across a cluster to other listening caches. Note that this listener cannot
 * be used in conjection with session caches.
 *
 * @version        $Revision: 254 $
 * @author <a href="&#109;a&#105;&#108;&#116;&#111;:chris&#64;swebtec.&#99;&#111;&#109;">Chris Miller</a>
 */
public abstract class AbstractBroadcastingListener implements CacheEntryEventListener, LifecycleAware {
    private final static Log log = LogFactory.getLog(AbstractBroadcastingListener.class);

    /**
     * The name to use for the origin of cluster events. Using this ensures
     * events are not fired recursively back over the cluster.
     */
    protected static final String CLUSTER_ORIGIN = "CLUSTER";
    protected Cache cache = null;

    public AbstractBroadcastingListener() {
        if (log.isInfoEnabled()) {
            log.info("AbstractBroadcastingListener registered");
        }
    }

    /**
     * Event fired when an entry is flushed from the cache. This broadcasts
     * the flush message to any listening nodes on the network.
     */
    public void cacheEntryFlushed(CacheEntryEvent event) {
        if (!Cache.NESTED_EVENT.equals(event.getOrigin()) && !CLUSTER_ORIGIN.equals(event.getOrigin())) {
            if (log.isDebugEnabled()) {
                log.debug("cacheEntryFlushed called (" + event + ")");
            }

            sendNotification(new ClusterNotification(ClusterNotification.FLUSH_KEY, event.getKey()));
        }
    }

    /**
     * Event fired when an entry is removed from the cache. This broadcasts
     * the remove method to any listening nodes on the network, as long as
     * this event wasn't from a broadcast in the first place.
     */
    public void cacheGroupFlushed(CacheGroupEvent event) {
        if (!Cache.NESTED_EVENT.equals(event.getOrigin()) && !CLUSTER_ORIGIN.equals(event.getOrigin())) {
            if (log.isDebugEnabled()) {
                log.debug("cacheGroupFushed called (" + event + ")");
            }

            sendNotification(new ClusterNotification(ClusterNotification.FLUSH_GROUP, event.getGroup()));
        }
    }

    public void cachePatternFlushed(CachePatternEvent event) {
        if (!Cache.NESTED_EVENT.equals(event.getOrigin()) && !CLUSTER_ORIGIN.equals(event.getOrigin())) {
            if (log.isDebugEnabled()) {
                log.debug("cachePatternFushed called (" + event + ")");
            }

            sendNotification(new ClusterNotification(ClusterNotification.FLUSH_PATTERN, event.getPattern()));
        }
    }

    public void cacheFlushed(CachewideEvent event) {
        if (!Cache.NESTED_EVENT.equals(event.getOrigin()) && !CLUSTER_ORIGIN.equals(event.getOrigin())) {
            if (log.isDebugEnabled()) {
                log.debug("cacheFushed called (" + event + ")");
            }

            sendNotification(new ClusterNotification(ClusterNotification.FLUSH_CACHE, event.getDate()));
        }
    }

    // --------------------------------------------------------
    // The remaining events are of no interest to this listener
    // --------------------------------------------------------
    public void cacheEntryAdded(CacheEntryEvent event) {
    }

    public void cacheEntryRemoved(CacheEntryEvent event) {
    }

    public void cacheEntryUpdated(CacheEntryEvent event) {
    }

    public void cacheGroupAdded(CacheGroupEvent event) {
    }

    public void cacheGroupEntryAdded(CacheGroupEvent event) {
    }

    public void cacheGroupEntryRemoved(CacheGroupEvent event) {
    }

    public void cacheGroupRemoved(CacheGroupEvent event) {
    }

    public void cacheGroupUpdated(CacheGroupEvent event) {
    }

    /**
     * Called by the cache administrator class when a cache is instantiated.
     *
     * @param cache the cache instance that this listener is attached to.
     * @param config The cache's configuration details. This allows the event handler
     * to initialize itself based on the cache settings, and also to receive <em>additional</em>
     * settings that were part of the cache configuration but that the cache
     * itself does not care about. If you are using <code>cache.properties</code>
     * for your configuration, simply add any additional properties that your event
     * handler requires and they will be passed through in this parameter.
     *
     * @throws InitializationException thrown when there was a problem initializing the
     * listener. The cache administrator will log this error and disable the listener.
     */
    public void initialize(Cache cache, Config config) throws InitializationException {
        this.cache = cache;
    }

    /**
     * Handles incoming notification messages. This method should be called by the
     * underlying broadcasting implementation when a message is received from another
     * node in the cluster.
     *
     * @param message The incoming cluster notification message object.
     */
    public void handleClusterNotification(ClusterNotification message) {
        if (cache == null) {
            log.warn("A cluster notification (" + message + ") was received, but no cache is registered on this machine. Notification ignored.");

            return;
        }

        if (log.isInfoEnabled()) {
            log.info("Cluster notification (" + message + ") was received.");
        }

        switch (message.getType()) {
            case ClusterNotification.FLUSH_KEY:
                cache.flushEntry((String) message.getData(), CLUSTER_ORIGIN);
                break;
            case ClusterNotification.FLUSH_GROUP:
                cache.flushGroup((String) message.getData(), CLUSTER_ORIGIN);
                break;
            case ClusterNotification.FLUSH_PATTERN:
                cache.flushPattern((String) message.getData(), CLUSTER_ORIGIN);
                break;
            case ClusterNotification.FLUSH_CACHE:
                cache.flushAll((Date) message.getData(), CLUSTER_ORIGIN);
                break;
            default:
                log.error("The cluster notification (" + message + ") is of an unknown type. Notification ignored.");
        }
    }

    /**
     * Called when a cluster notification message is to be broadcast. Implementing
     * classes should use their underlying transport to broadcast the message across
     * the cluster.
     *
     * @param message The notification message to broadcast.
     */
    abstract protected void sendNotification(ClusterNotification message);
}

 
所以我们要集群式同步数据,必须实现这三个方法:

public void cacheEntryAdded(CacheEntryEvent event) {
    }

    public void cacheEntryRemoved(CacheEntryEvent event) {
    }

    public void cacheEntryUpdated(CacheEntryEvent event) {
    }

 这个实现类实现了以上的三个方法,就可以同步数据了:

package com.test;
import com.opensymphony.oscache.base.events.CacheEntryEvent;
import com.opensymphony.oscache.plugins.clustersupport.ClusterNotification;
import com.opensymphony.oscache.plugins.clustersupport.JavaGroupsBroadcastingListener;

public class JavaGroupsBroadcastingListenerImpl extends
		JavaGroupsBroadcastingListener {
	public void handleClusterNotification(ClusterNotification message) {
		
		switch (message.getType()) {
		case CacheConstants.CLUSTER_ENTRY_ADD:
			System.out.println("集群新增:" + message.getData());
			if(message.getData() instanceof QflagCacheEvent) {
				QflagCacheEvent event = (QflagCacheEvent)message.getData();
				cache.putInCache(event.getKey(), event.getEntry().getContent(),null,null,CLUSTER_ORIGIN);
			}
			break;
		case CacheConstants.CLUSTER_ENTRY_UPDATE:
			System.out.println("集群更新:" + message.getData());
			if(message.getData() instanceof QflagCacheEvent) {
				QflagCacheEvent event = (QflagCacheEvent)message.getData();
//				cache.flushEntry(event.getKey());
				cache.putInCache(event.getKey(), event.getEntry().getContent(),null,null,CLUSTER_ORIGIN);
			}
			break;
		case CacheConstants.CLUSTER_ENTRY_DELETE:
			System.out.println("集群删除:" + message.getData());
			if(message.getData() instanceof QflagCacheEvent) {
				QflagCacheEvent event = (QflagCacheEvent)message.getData();
//				cache.removeEntry(event.getKey(),event.getOrigin());
				cache.removeEntry(event.getKey());
			}
			break;
		}

	}
	
	public void cacheEntryAdded(CacheEntryEvent event) {
		super.cacheEntryAdded(event);
		System.out.println("属性添加");
		if(!CLUSTER_ORIGIN.equals(event.getOrigin())) {
			sendNotification(new ClusterNotification(CacheConstants.CLUSTER_ENTRY_ADD, new QflagCacheEvent(event.getMap(),event.getEntry(),CLUSTER_ORIGIN)));
		}
	}

//	@Override
//	public void cacheEntryFlushed(CacheEntryEvent event) {
//		
//		super.cacheEntryFlushed(event);
//		if(!CLUSTER_ORIGIN.equals(event.getOrigin())) {
//			sendNotification(new ClusterNotification(CacheConstants.CLUSTER_ENTRY_ADD, new UcallCacheEvent(event.getMap(),event.getEntry(),CLUSTER_ORIGIN)));
//		}
//	}

	@Override
	public void cacheEntryRemoved(CacheEntryEvent event) {
		System.out.println("属性移除");
		super.cacheEntryRemoved(event);
		if(!CLUSTER_ORIGIN.equals(event.getOrigin())) {
			sendNotification(new ClusterNotification(CacheConstants.CLUSTER_ENTRY_DELETE, new QflagCacheEvent(event.getMap(),event.getEntry(),CLUSTER_ORIGIN)));
		}
	}

	@Override
	public void cacheEntryUpdated(CacheEntryEvent event) {
		System.out.println("属性更新");
		super.cacheEntryUpdated(event);
		if(!CLUSTER_ORIGIN.equals(event.getOrigin())) {
			sendNotification(new ClusterNotification(CacheConstants.CLUSTER_ENTRY_UPDATE, new QflagCacheEvent(event.getMap(),event.getEntry(),CLUSTER_ORIGIN)));
		}
	}
	
}

 

然后修改配置文件,映射cache.event.listeners类的路径:

cache.event.listeners=com.test.JavaGroupsBroadcastingListenerImpl
cache.cluster.properties=UDP(mcast_addr=231.12.21.132;mcast_port=45566;ip_ttl=32;\
mcast_send_buf_size=150000;mcast_recv_buf_size=80000):\
PING(timeout=2000;num_initial_members=3):\
MERGE2(min_interval=5000;max_interval=10000):\
FD_SOCK:VERIFY_SUSPECT(timeout=1500):\
pbcast.NAKACK(gc_lag=50;retransmit_timeout=300,600,1200,2400,4800;max_xmit_size=8192):\
UNICAST(timeout=300,600,1200,2400):\
pbcast.STABLE(desired_avg_gossip=20000):\
FRAG(frag_size=8096;down_thread=false;up_thread=false):\
pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;shun=false;print_local_addr=true)
cache.cluster.multicast.ip=231.12.21.132

 具体源码可以参考 oscache集群功能  http://rwl6813021.iteye.com/blog/246473,我已经实现了整合。

测试使用两个tomcat6.0,无问题,但是一个tomca6.0,一个tomcat7.0时发现缓存没有同步。。。

分享到:
评论
3 楼 tianhandigeng 2013-03-27  
我的环境UDP也无法使用,我也想用TCP,我照着网上的这种TCP配置,报错,不行:
TCP(start_port=7800):TCPPING(initial_hosts=192.168.0.103[7800],192.168.0.110[7800];port_range=10;timeout=3000;num_initial_members=3;up_thread=true;down_thread=true):VERIFY_SUSPECT(timeout=1500;down_thread=false;up_thread=false):pbcast.NAKACK(down_thread=true;up_thread=true;gc_lag=100;retransmit_timeout=3000):pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;shun=false;print_local_addr=false;down_thread=true;up_thread=true)
报了这样的错误:
com.opensymphony.oscache.base.InitializationException: Initialization failed: org.jgroups.ChannelException: unable to setup the protocol stack
2 楼 qq123zhz 2012-08-18  
timeflowing 写道
非常有用,请教下,oscache集群可以用TCP通信么?我的集群不在一个网段,没法UDP广播

可以的,你找找资料。。。
1 楼 timeflowing 2012-08-14  
非常有用,请教下,oscache集群可以用TCP通信么?我的集群不在一个网段,没法UDP广播

相关推荐

    oscache-java缓存框架

    - **集群支持**:在分布式环境中,osCache可以实现多节点间的缓存同步,确保数据一致性。 - **缓存预热**:允许在应用启动时加载预定义的数据到缓存,提高应用启动速度。 - **缓存监听器**:可以通过监听器来监控...

    oscache对象缓存

    通过使用JMS(Java Message Service)或RMI(Remote Method Invocation)通信协议,oscache可以在集群中的不同节点之间同步缓存状态,确保数据的一致性。 在实际应用中,oscache通常与Spring框架集成,以实现更灵活...

    oscache-2.1.1-full.zip_full_oscache_oscache 2_oscache2

    6. **事件监听**: 提供了缓存事件监听器接口,允许开发者在数据的存取过程中进行额外的操作,如数据同步、日志记录等。 **压缩包内容分析** 1. **docs**: 包含osCache的文档,包括用户手册、API参考等,是学习和...

    oscache缓存

    osCache是Java平台上的一个高效的缓存解决方案,主要用于在应用程序中缓存数据,以提高性能和减少数据库的负载。这个工具特别适用于那些需要频繁访问但更新不频繁的数据,例如经常查询但很少更改的数据库记录。...

    OSCache缓存技术(6)【实例】

    此外,OSCache还支持集群环境下的缓存同步,通过`ClusteredCache`接口,可以在多台服务器之间共享缓存数据,实现分布式缓存。这在大型分布式系统中非常关键,因为它能确保数据的一致性和可用性。 OSCache的配置主要...

    oscache.rar

    在集群环境中,这有助于进行数据同步和业务逻辑处理。 6. **故障转移和容错**:在集群环境中,如果一个节点出现故障,OSCache应能自动将请求重定向到其他正常工作的节点,保证服务的连续性。 7. **性能优化**:...

    OSCache简介

    在集群环境中,OSCache 可以通过 JGroups 实现分布式缓存,使得多个服务器间的数据共享和一致性得以保证。缓存预热是指在应用启动时,预先加载一部分数据到缓存中,以减少用户等待时间。 使用 OSCache 时,开发者...

    oscache2.1_ful

    6. **缓存同步**: 在分布式环境中,OSCache 支持集群间的缓存同步,保证所有节点的数据一致性。 **二、OSCache 的工作原理** OSCache 基于 JVM 内存进行缓存,通过序列化和反序列化对象来存储和读取。当一个对象被...

    应用OSCache提升J2EE系统运行性能

    2. 集群支持:通过分布式缓存机制,不同服务器间的缓存数据可以保持同步,实现负载均衡。 3. 动态更新:当源数据发生变化时,OSCache可以通过监听机制自动更新缓存,保证数据一致性。 4. 缓存策略:支持多种缓存策略...

    Cache技术--OSCache

    OSCache的使用并不复杂,开发者可以通过官方文档进一步了解详细配置和高级功能,例如缓存预热、缓存同步、缓存锁定等,以充分利用其性能优化潜力。 总结,OSCache作为一款强大的缓存框架,不仅提高了Web应用的性能...

    oscache-2.3.jar

    2. **分布式缓存**:支持多节点间的缓存同步,适合于分布式环境,确保在集群中的数据一致性。 3. **缓存策略**:提供了多种缓存策略,如LRU(Least Recently Used)最近最少使用、FIFO(First In First Out)先进先...

    应用OSCache提升J2EE系统.pdf

    5. **分布式缓存**: 在集群环境下,OSCache支持分布式缓存,使得多个节点间的缓存保持同步。这样,无论用户请求哪个节点,都能获取到相同且最新的缓存数据,增强了系统的可扩展性和可用性。 6. **缓存预热**: 开启...

    Liferay集群负载均衡配置

    Liferay的缓存系统(如Velocity Cache或OSCache)也需要在集群间同步,以确保所有节点访问的数据是最新的。这可以通过配置缓存同步策略实现。 6. **热添加/删除节点**: 能够动态添加或移除服务器节点是集群的...

    Java对象缓存系统的实现,实现了LRU算法,并可以进行集群同步

    5. `oscache-2.4.1.jar` 是一个开源的缓存框架,提供了本地缓存和分布式缓存功能,可能在这个项目中用于辅助实现LRU缓存。 6. `commons-logging-1.1.jar` 是Apache Commons Logging库,它是一个日志抽象层,允许在...

    java应用服务器集群环境下代码编写要注意的问题参照.pdf

    此时,需要考虑线程同步和定时器的集群解决方案,如Spring Quartz提供的集群支持。 4. **JDBC连接管理**:对于数据库集群,应正确配置JDBC连接,确保所有服务器都能正确访问数据库。例如,使用数据源(DataSource)...

    RocketMQ消息丢失解决方案:同步刷盘+手动提交.docx

    - **优点**:通过同步刷盘与手动提交的结合使用,可以极大程度地减少消息丢失的概率,保证了系统的稳定性和数据的一致性。这对于涉及金融交易、订单处理等关键业务场景尤为重要。 - **缺点**:实施这样的方案会增加...

    页面缓存的小测试

    OSCache提供了一种便捷的方式来管理Web应用程序中的对象缓存,支持本地缓存和分布式缓存,适用于单服务器和集群环境。其主要特性包括: 1. **自动缓存更新**:OSCache能够监听对象的变化,并自动更新缓存中的内容,...

    二级缓存详解

    在使用二级缓存时,需要注意缓存同步和失效的问题,因为缓存中的数据可能与数据库中的数据不同步。因此,需要正确配置缓存的生命周期和清理策略,以确保数据一致性。 总之,Hibernate的二级缓存是一个强大的工具,...

Global site tag (gtag.js) - Google Analytics