- 浏览: 534513 次
- 性别:
- 来自: 深圳
文章分类
最新评论
-
gaolegao2008:
如果报 is_volum 列名找不到之类的,我是从新部署了一个 ...
spring quartz 定时器报错 -
gaolegao2008:
部署到linux上时,还有一种情况就是mysql数据库区分大小 ...
spring quartz 定时器报错 -
qq123zhz:
yahier 写道 对我有帮助,但我看的一个demo程序,却没 ...
spring quartz 定时器报错 -
qq123zhz:
这个要在eclipse的插件环境下运行的,你不懂eclipse ...
GEF 自动布局 -
qq123zhz:
这个很久了,不记得啥时候写的了
json转为Map
cache.event.listeners=com.test.JavaGroupsBroadcastingListenerImpl cache.cluster.properties=UDP(mcast_addr=231.12.21.132;mcast_port=45566;ip_ttl=32;\ mcast_send_buf_size=150000;mcast_recv_buf_size=80000):\ PING(timeout=2000;num_initial_members=3):\ MERGE2(min_interval=5000;max_interval=10000):\ FD_SOCK:VERIFY_SUSPECT(timeout=1500):\ pbcast.NAKACK(gc_lag=50;retransmit_timeout=300,600,1200,2400,4800;max_xmit_size=8192):\ UNICAST(timeout=300,600,1200,2400):\ pbcast.STABLE(desired_avg_gossip=20000):\ FRAG(frag_size=8096;down_thread=false;up_thread=false):\ pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;shun=false;print_local_addr=true) cache.cluster.multicast.ip=231.12.21.132
oscache 集群同步数据需要使用到JavaGroupsBroadcastingListener,但是JavaGroupsBroadcastingListener 具体没有实现同步的功能,JavaGroupsBroadcastingListener:
public void handleNotification(Serializable serializable) { if (!(serializable instanceof ClusterNotification)) { log.error("An unknown cluster notification message received (class=" + serializable.getClass().getName() + "). Notification ignored."); return; } handleClusterNotification((ClusterNotification) serializable); }
handleNotification该方法调用 handleClusterNotification((ClusterNotification) serializable)后,进入AbstractBoardcasting类,改方法只执行刷新当前缓存的内容,对集群数据同步没有实现。
/* * Copyright (c) 2002-2003 by OpenSymphony * All rights reserved. */ package com.opensymphony.oscache.plugins.clustersupport; import com.opensymphony.oscache.base.*; import com.opensymphony.oscache.base.events.*; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import java.util.Date; /** * Implementation of a CacheEntryEventListener. It broadcasts the flush events * across a cluster to other listening caches. Note that this listener cannot * be used in conjection with session caches. * * @version $Revision: 254 $ * @author <a href="mailto:chris@swebtec.com">Chris Miller</a> */ public abstract class AbstractBroadcastingListener implements CacheEntryEventListener, LifecycleAware { private final static Log log = LogFactory.getLog(AbstractBroadcastingListener.class); /** * The name to use for the origin of cluster events. Using this ensures * events are not fired recursively back over the cluster. */ protected static final String CLUSTER_ORIGIN = "CLUSTER"; protected Cache cache = null; public AbstractBroadcastingListener() { if (log.isInfoEnabled()) { log.info("AbstractBroadcastingListener registered"); } } /** * Event fired when an entry is flushed from the cache. This broadcasts * the flush message to any listening nodes on the network. */ public void cacheEntryFlushed(CacheEntryEvent event) { if (!Cache.NESTED_EVENT.equals(event.getOrigin()) && !CLUSTER_ORIGIN.equals(event.getOrigin())) { if (log.isDebugEnabled()) { log.debug("cacheEntryFlushed called (" + event + ")"); } sendNotification(new ClusterNotification(ClusterNotification.FLUSH_KEY, event.getKey())); } } /** * Event fired when an entry is removed from the cache. This broadcasts * the remove method to any listening nodes on the network, as long as * this event wasn't from a broadcast in the first place. */ public void cacheGroupFlushed(CacheGroupEvent event) { if (!Cache.NESTED_EVENT.equals(event.getOrigin()) && !CLUSTER_ORIGIN.equals(event.getOrigin())) { if (log.isDebugEnabled()) { log.debug("cacheGroupFushed called (" + event + ")"); } sendNotification(new ClusterNotification(ClusterNotification.FLUSH_GROUP, event.getGroup())); } } public void cachePatternFlushed(CachePatternEvent event) { if (!Cache.NESTED_EVENT.equals(event.getOrigin()) && !CLUSTER_ORIGIN.equals(event.getOrigin())) { if (log.isDebugEnabled()) { log.debug("cachePatternFushed called (" + event + ")"); } sendNotification(new ClusterNotification(ClusterNotification.FLUSH_PATTERN, event.getPattern())); } } public void cacheFlushed(CachewideEvent event) { if (!Cache.NESTED_EVENT.equals(event.getOrigin()) && !CLUSTER_ORIGIN.equals(event.getOrigin())) { if (log.isDebugEnabled()) { log.debug("cacheFushed called (" + event + ")"); } sendNotification(new ClusterNotification(ClusterNotification.FLUSH_CACHE, event.getDate())); } } // -------------------------------------------------------- // The remaining events are of no interest to this listener // -------------------------------------------------------- public void cacheEntryAdded(CacheEntryEvent event) { } public void cacheEntryRemoved(CacheEntryEvent event) { } public void cacheEntryUpdated(CacheEntryEvent event) { } public void cacheGroupAdded(CacheGroupEvent event) { } public void cacheGroupEntryAdded(CacheGroupEvent event) { } public void cacheGroupEntryRemoved(CacheGroupEvent event) { } public void cacheGroupRemoved(CacheGroupEvent event) { } public void cacheGroupUpdated(CacheGroupEvent event) { } /** * Called by the cache administrator class when a cache is instantiated. * * @param cache the cache instance that this listener is attached to. * @param config The cache's configuration details. This allows the event handler * to initialize itself based on the cache settings, and also to receive <em>additional</em> * settings that were part of the cache configuration but that the cache * itself does not care about. If you are using <code>cache.properties</code> * for your configuration, simply add any additional properties that your event * handler requires and they will be passed through in this parameter. * * @throws InitializationException thrown when there was a problem initializing the * listener. The cache administrator will log this error and disable the listener. */ public void initialize(Cache cache, Config config) throws InitializationException { this.cache = cache; } /** * Handles incoming notification messages. This method should be called by the * underlying broadcasting implementation when a message is received from another * node in the cluster. * * @param message The incoming cluster notification message object. */ public void handleClusterNotification(ClusterNotification message) { if (cache == null) { log.warn("A cluster notification (" + message + ") was received, but no cache is registered on this machine. Notification ignored."); return; } if (log.isInfoEnabled()) { log.info("Cluster notification (" + message + ") was received."); } switch (message.getType()) { case ClusterNotification.FLUSH_KEY: cache.flushEntry((String) message.getData(), CLUSTER_ORIGIN); break; case ClusterNotification.FLUSH_GROUP: cache.flushGroup((String) message.getData(), CLUSTER_ORIGIN); break; case ClusterNotification.FLUSH_PATTERN: cache.flushPattern((String) message.getData(), CLUSTER_ORIGIN); break; case ClusterNotification.FLUSH_CACHE: cache.flushAll((Date) message.getData(), CLUSTER_ORIGIN); break; default: log.error("The cluster notification (" + message + ") is of an unknown type. Notification ignored."); } } /** * Called when a cluster notification message is to be broadcast. Implementing * classes should use their underlying transport to broadcast the message across * the cluster. * * @param message The notification message to broadcast. */ abstract protected void sendNotification(ClusterNotification message); }
所以我们要集群式同步数据,必须实现这三个方法:
public void cacheEntryAdded(CacheEntryEvent event) { } public void cacheEntryRemoved(CacheEntryEvent event) { } public void cacheEntryUpdated(CacheEntryEvent event) { }
这个实现类实现了以上的三个方法,就可以同步数据了:
package com.test; import com.opensymphony.oscache.base.events.CacheEntryEvent; import com.opensymphony.oscache.plugins.clustersupport.ClusterNotification; import com.opensymphony.oscache.plugins.clustersupport.JavaGroupsBroadcastingListener; public class JavaGroupsBroadcastingListenerImpl extends JavaGroupsBroadcastingListener { public void handleClusterNotification(ClusterNotification message) { switch (message.getType()) { case CacheConstants.CLUSTER_ENTRY_ADD: System.out.println("集群新增:" + message.getData()); if(message.getData() instanceof QflagCacheEvent) { QflagCacheEvent event = (QflagCacheEvent)message.getData(); cache.putInCache(event.getKey(), event.getEntry().getContent(),null,null,CLUSTER_ORIGIN); } break; case CacheConstants.CLUSTER_ENTRY_UPDATE: System.out.println("集群更新:" + message.getData()); if(message.getData() instanceof QflagCacheEvent) { QflagCacheEvent event = (QflagCacheEvent)message.getData(); // cache.flushEntry(event.getKey()); cache.putInCache(event.getKey(), event.getEntry().getContent(),null,null,CLUSTER_ORIGIN); } break; case CacheConstants.CLUSTER_ENTRY_DELETE: System.out.println("集群删除:" + message.getData()); if(message.getData() instanceof QflagCacheEvent) { QflagCacheEvent event = (QflagCacheEvent)message.getData(); // cache.removeEntry(event.getKey(),event.getOrigin()); cache.removeEntry(event.getKey()); } break; } } public void cacheEntryAdded(CacheEntryEvent event) { super.cacheEntryAdded(event); System.out.println("属性添加"); if(!CLUSTER_ORIGIN.equals(event.getOrigin())) { sendNotification(new ClusterNotification(CacheConstants.CLUSTER_ENTRY_ADD, new QflagCacheEvent(event.getMap(),event.getEntry(),CLUSTER_ORIGIN))); } } // @Override // public void cacheEntryFlushed(CacheEntryEvent event) { // // super.cacheEntryFlushed(event); // if(!CLUSTER_ORIGIN.equals(event.getOrigin())) { // sendNotification(new ClusterNotification(CacheConstants.CLUSTER_ENTRY_ADD, new UcallCacheEvent(event.getMap(),event.getEntry(),CLUSTER_ORIGIN))); // } // } @Override public void cacheEntryRemoved(CacheEntryEvent event) { System.out.println("属性移除"); super.cacheEntryRemoved(event); if(!CLUSTER_ORIGIN.equals(event.getOrigin())) { sendNotification(new ClusterNotification(CacheConstants.CLUSTER_ENTRY_DELETE, new QflagCacheEvent(event.getMap(),event.getEntry(),CLUSTER_ORIGIN))); } } @Override public void cacheEntryUpdated(CacheEntryEvent event) { System.out.println("属性更新"); super.cacheEntryUpdated(event); if(!CLUSTER_ORIGIN.equals(event.getOrigin())) { sendNotification(new ClusterNotification(CacheConstants.CLUSTER_ENTRY_UPDATE, new QflagCacheEvent(event.getMap(),event.getEntry(),CLUSTER_ORIGIN))); } } }
然后修改配置文件,映射cache.event.listeners类的路径:
cache.event.listeners=com.test.JavaGroupsBroadcastingListenerImpl cache.cluster.properties=UDP(mcast_addr=231.12.21.132;mcast_port=45566;ip_ttl=32;\ mcast_send_buf_size=150000;mcast_recv_buf_size=80000):\ PING(timeout=2000;num_initial_members=3):\ MERGE2(min_interval=5000;max_interval=10000):\ FD_SOCK:VERIFY_SUSPECT(timeout=1500):\ pbcast.NAKACK(gc_lag=50;retransmit_timeout=300,600,1200,2400,4800;max_xmit_size=8192):\ UNICAST(timeout=300,600,1200,2400):\ pbcast.STABLE(desired_avg_gossip=20000):\ FRAG(frag_size=8096;down_thread=false;up_thread=false):\ pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;shun=false;print_local_addr=true) cache.cluster.multicast.ip=231.12.21.132
具体源码可以参考 oscache集群功能 http://rwl6813021.iteye.com/blog/246473,我已经实现了整合。
测试使用两个tomcat6.0,无问题,但是一个tomca6.0,一个tomcat7.0时发现缓存没有同步。。。
- status.zip (2.4 MB)
- 下载次数: 339
评论
3 楼
tianhandigeng
2013-03-27
我的环境UDP也无法使用,我也想用TCP,我照着网上的这种TCP配置,报错,不行:
TCP(start_port=7800):TCPPING(initial_hosts=192.168.0.103[7800],192.168.0.110[7800];port_range=10;timeout=3000;num_initial_members=3;up_thread=true;down_thread=true):VERIFY_SUSPECT(timeout=1500;down_thread=false;up_thread=false):pbcast.NAKACK(down_thread=true;up_thread=true;gc_lag=100;retransmit_timeout=3000):pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;shun=false;print_local_addr=false;down_thread=true;up_thread=true)
报了这样的错误:
com.opensymphony.oscache.base.InitializationException: Initialization failed: org.jgroups.ChannelException: unable to setup the protocol stack
TCP(start_port=7800):TCPPING(initial_hosts=192.168.0.103[7800],192.168.0.110[7800];port_range=10;timeout=3000;num_initial_members=3;up_thread=true;down_thread=true):VERIFY_SUSPECT(timeout=1500;down_thread=false;up_thread=false):pbcast.NAKACK(down_thread=true;up_thread=true;gc_lag=100;retransmit_timeout=3000):pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;shun=false;print_local_addr=false;down_thread=true;up_thread=true)
报了这样的错误:
com.opensymphony.oscache.base.InitializationException: Initialization failed: org.jgroups.ChannelException: unable to setup the protocol stack
2 楼
qq123zhz
2012-08-18
timeflowing 写道
非常有用,请教下,oscache集群可以用TCP通信么?我的集群不在一个网段,没法UDP广播
可以的,你找找资料。。。
1 楼
timeflowing
2012-08-14
非常有用,请教下,oscache集群可以用TCP通信么?我的集群不在一个网段,没法UDP广播
发表评论
-
eclipse Resource 资料
2016-11-15 16:51 617IWorkspace/IWorkspaceRoot/IPro ... -
.docker/machine/machines/default/ca.pem: no such file or directory
2016-09-06 15:59 886Was doing adocker-machine env ... -
list.AddAll 去重复
2015-09-11 12:01 5901问题描述: 有List A和B,A、B中元素都是可保证 ... -
eclipse 插件开发 Setting the Java build path
2014-06-04 11:00 1216JDT Plug-in Developer Guide & ... -
xstream 下划线_问题
2013-12-30 10:18 4180最近在使用xtream 1.4.3,出现了如下的问题: ... -
RCP MessageConsole设置显示的最大行数
2013-09-05 11:34 966MessageConsole.setWaterMarks(5 ... -
freemarker 自定义freeMarker标签
2013-08-12 16:09 3886import java.io.IOException; i ... -
xstream javabean设置属性默认值的问题
2013-04-24 09:54 6568在xstream反序列化使用过程中发现,如果xml无该属 ... -
在使用xstream反序列化时遇到的问题
2013-04-24 09:42 1747public abstract class SBase { ... -
RCP 知识点
2013-03-14 15:10 997获得工作区的所有工程: //获得workspace的所有 ... -
RCP FileSystem 文件系统
2013-02-19 10:42 1381public static File toLocalFile ... -
json转为Map
2013-01-19 22:27 32829package digu.pendant.util; ... -
eclipse4.x 去掉quick access
2013-01-11 14:57 4057/** * 去掉quick access * ... -
xstream 的高级用法,自定义输出结构
2012-12-19 14:35 2423public static void main(Stri ... -
jdt 核心知识
2012-11-27 21:39 1279jdt官方核心知识...................... ... -
jdt 创建java工程,生成代码,运行main方法
2012-11-27 10:50 2345public static IJavaProje ... -
两个osgi的例子程序
2012-11-20 10:21 1077osgi的例子....................直接上代 ... -
jfreechart 的官方例子,很全
2012-11-20 10:19 944jfreechart的官方的demon,很全。 -
SWT 隔行换色-自动宽高调整
2012-10-16 17:32 1710** * 创建:ZhengXi 2009-8-4 */ ... -
RCP 为action添加操作进度条
2012-10-16 13:59 1292public class StartAction extend ...
相关推荐
- **集群支持**:在分布式环境中,osCache可以实现多节点间的缓存同步,确保数据一致性。 - **缓存预热**:允许在应用启动时加载预定义的数据到缓存,提高应用启动速度。 - **缓存监听器**:可以通过监听器来监控...
通过使用JMS(Java Message Service)或RMI(Remote Method Invocation)通信协议,oscache可以在集群中的不同节点之间同步缓存状态,确保数据的一致性。 在实际应用中,oscache通常与Spring框架集成,以实现更灵活...
6. **事件监听**: 提供了缓存事件监听器接口,允许开发者在数据的存取过程中进行额外的操作,如数据同步、日志记录等。 **压缩包内容分析** 1. **docs**: 包含osCache的文档,包括用户手册、API参考等,是学习和...
osCache是Java平台上的一个高效的缓存解决方案,主要用于在应用程序中缓存数据,以提高性能和减少数据库的负载。这个工具特别适用于那些需要频繁访问但更新不频繁的数据,例如经常查询但很少更改的数据库记录。...
此外,OSCache还支持集群环境下的缓存同步,通过`ClusteredCache`接口,可以在多台服务器之间共享缓存数据,实现分布式缓存。这在大型分布式系统中非常关键,因为它能确保数据的一致性和可用性。 OSCache的配置主要...
在集群环境中,这有助于进行数据同步和业务逻辑处理。 6. **故障转移和容错**:在集群环境中,如果一个节点出现故障,OSCache应能自动将请求重定向到其他正常工作的节点,保证服务的连续性。 7. **性能优化**:...
在集群环境中,OSCache 可以通过 JGroups 实现分布式缓存,使得多个服务器间的数据共享和一致性得以保证。缓存预热是指在应用启动时,预先加载一部分数据到缓存中,以减少用户等待时间。 使用 OSCache 时,开发者...
6. **缓存同步**: 在分布式环境中,OSCache 支持集群间的缓存同步,保证所有节点的数据一致性。 **二、OSCache 的工作原理** OSCache 基于 JVM 内存进行缓存,通过序列化和反序列化对象来存储和读取。当一个对象被...
2. 集群支持:通过分布式缓存机制,不同服务器间的缓存数据可以保持同步,实现负载均衡。 3. 动态更新:当源数据发生变化时,OSCache可以通过监听机制自动更新缓存,保证数据一致性。 4. 缓存策略:支持多种缓存策略...
OSCache的使用并不复杂,开发者可以通过官方文档进一步了解详细配置和高级功能,例如缓存预热、缓存同步、缓存锁定等,以充分利用其性能优化潜力。 总结,OSCache作为一款强大的缓存框架,不仅提高了Web应用的性能...
2. **分布式缓存**:支持多节点间的缓存同步,适合于分布式环境,确保在集群中的数据一致性。 3. **缓存策略**:提供了多种缓存策略,如LRU(Least Recently Used)最近最少使用、FIFO(First In First Out)先进先...
5. **分布式缓存**: 在集群环境下,OSCache支持分布式缓存,使得多个节点间的缓存保持同步。这样,无论用户请求哪个节点,都能获取到相同且最新的缓存数据,增强了系统的可扩展性和可用性。 6. **缓存预热**: 开启...
Liferay的缓存系统(如Velocity Cache或OSCache)也需要在集群间同步,以确保所有节点访问的数据是最新的。这可以通过配置缓存同步策略实现。 6. **热添加/删除节点**: 能够动态添加或移除服务器节点是集群的...
5. `oscache-2.4.1.jar` 是一个开源的缓存框架,提供了本地缓存和分布式缓存功能,可能在这个项目中用于辅助实现LRU缓存。 6. `commons-logging-1.1.jar` 是Apache Commons Logging库,它是一个日志抽象层,允许在...
此时,需要考虑线程同步和定时器的集群解决方案,如Spring Quartz提供的集群支持。 4. **JDBC连接管理**:对于数据库集群,应正确配置JDBC连接,确保所有服务器都能正确访问数据库。例如,使用数据源(DataSource)...
- **优点**:通过同步刷盘与手动提交的结合使用,可以极大程度地减少消息丢失的概率,保证了系统的稳定性和数据的一致性。这对于涉及金融交易、订单处理等关键业务场景尤为重要。 - **缺点**:实施这样的方案会增加...
OSCache提供了一种便捷的方式来管理Web应用程序中的对象缓存,支持本地缓存和分布式缓存,适用于单服务器和集群环境。其主要特性包括: 1. **自动缓存更新**:OSCache能够监听对象的变化,并自动更新缓存中的内容,...
在使用二级缓存时,需要注意缓存同步和失效的问题,因为缓存中的数据可能与数据库中的数据不同步。因此,需要正确配置缓存的生命周期和清理策略,以确保数据一致性。 总之,Hibernate的二级缓存是一个强大的工具,...