1服务Provider
1.1自动发现配置
<cacheManagerPeerProviderFactory
class="net.sf.ehcache.distribution.RMICacheManagerPeerProviderFactory"
properties="peerDiscovery=automatic, multicastGroupAddress=230.0.0.1,
multicastGroupPort=4446, timeToLive=32"/>
1.2手动发现配置
<cacheManagerPeerProviderFactory
class="net.sf.ehcache.distribution.RMICacheManagerPeerProviderFactory"
properties="peerDiscovery=manual,rmiUrls=//server2:40001/sampleCache11|//server2:40001/sampleCache12"/>
<cacheManagerPeerProviderFactory
class="net.sf.ehcache.distribution.RMICacheManagerPeerProviderFactory"
properties="peerDiscovery=manual,rmiUrls=//server1:40001/sampleCache11|//server1:40001/sampleCache12"/>
1.3源码分析-RMICacheManagerPeerProviderFactory
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
public CacheManagerPeerProvider createCachePeerProvider(CacheManager cacheManager, Properties properties)
throws CacheException {
String peerDiscovery = PropertyUtil.extractAndLogProperty(PEER_DISCOVERY, properties);
if (peerDiscovery == null || peerDiscovery.equalsIgnoreCase(AUTOMATIC_PEER_DISCOVERY)) {
try {
return createAutomaticallyConfiguredCachePeerProvider(cacheManager, properties);
} catch (IOException e) {
throw new CacheException( "Could not create CacheManagerPeerProvider. Initial cause was " + e.getMessage(), e);
}
} else if (peerDiscovery.equalsIgnoreCase(MANUALLY_CONFIGURED_PEER_DISCOVERY)) {
return createManuallyConfiguredCachePeerProvider(properties);
} else {
return null ;
}
}
protected CacheManagerPeerProvider createManuallyConfiguredCachePeerProvider(Properties properties) {
String rmiUrls = PropertyUtil.extractAndLogProperty(RMI_URLS, properties);
if (rmiUrls == null || rmiUrls.length() == 0 ) {
LOG.info( "Starting manual peer provider with empty list of peers. " +
"No replication will occur unless peers are added." );
rmiUrls = new String();
}
rmiUrls = rmiUrls.trim();
StringTokenizer stringTokenizer = new StringTokenizer(rmiUrls, PayloadUtil.URL_DELIMITER);
RMICacheManagerPeerProvider rmiPeerProvider = new ManualRMICacheManagerPeerProvider();
while (stringTokenizer.hasMoreTokens()) {
String rmiUrl = stringTokenizer.nextToken();
rmiUrl = rmiUrl.trim();
rmiPeerProvider.registerPeer(rmiUrl);
LOG.debug( "Registering peer {}" , rmiUrl);
}
return rmiPeerProvider;
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
|
public final synchronized void registerPeer(String rmiUrl) {
peerUrls.put(rmiUrl, new Date());
}
public final synchronized List listRemoteCachePeers(Ehcache cache) throws CacheException {
List remoteCachePeers = new ArrayList();
List staleList = new ArrayList();
for (Iterator iterator = peerUrls.keySet().iterator(); iterator.hasNext();) {
String rmiUrl = (String) iterator.next();
String rmiUrlCacheName = extractCacheName(rmiUrl);
if (!rmiUrlCacheName.equals(cache.getName())) {
continue ;
}
Date date = (Date) peerUrls.get(rmiUrl);
if (!stale(date)) {
CachePeer cachePeer = null ;
try {
cachePeer = lookupRemoteCachePeer(rmiUrl);
remoteCachePeers.add(cachePeer);
} catch (Exception e) {
if (LOG.isDebugEnabled()) {
LOG.debug( "Looking up rmiUrl " + rmiUrl + " through exception " + e.getMessage()
+ ". This may be normal if a node has gone offline. Or it may indicate network connectivity"
+ " difficulties" , e);
}
}
} else {
LOG.debug( "rmiUrl {} should never be stale for a manually configured cluster." , rmiUrl);
staleList.add(rmiUrl);
}
}
//Remove any stale remote peers. Must be done here to avoid concurrent modification exception.
for ( int i = 0 ; i < staleList.size(); i++) {
String rmiUrl = (String) staleList.get(i);
peerUrls.remove(rmiUrl);
}
return remoteCachePeers;
}
public CachePeer lookupRemoteCachePeer(String url) throws MalformedURLException, NotBoundException, RemoteException {
LOG.debug( "Lookup URL {}" , url);
CachePeer cachePeer = (CachePeer) Naming.lookup(url);
return cachePeer;
}
|
1
2
3
4
5
6
7
8
9
10
11
|
public MulticastRMICacheManagerPeerProvider(CacheManager cacheManager, InetAddress groupMulticastAddress,
Integer groupMulticastPort, Integer timeToLive, InetAddress hostAddress) {
super (cacheManager);
heartBeatReceiver = new MulticastKeepaliveHeartbeatReceiver( this , groupMulticastAddress,
groupMulticastPort, hostAddress);
heartBeatSender = new MulticastKeepaliveHeartbeatSender(cacheManager, groupMulticastAddress,
groupMulticastPort, timeToLive, hostAddress);
}
|
2服务Listener
2.1配置文件
<cacheManagerPeerListenerFactory
class="net.sf.ehcache.distribution.RMICacheManagerPeerListenerFactory"
properties="hostName=localhost, port=40001,
socketTimeoutMillis=2000"/>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
public final CacheManagerPeerListener createCachePeerListener(CacheManager cacheManager, Properties properties)
throws CacheException {
String hostName = PropertyUtil.extractAndLogProperty(HOSTNAME, properties);
String portString = PropertyUtil.extractAndLogProperty(PORT, properties);
Integer port = null ;
if (portString != null && portString.length() != 0 ) {
port = Integer.valueOf(portString);
} else {
port = Integer.valueOf( 0 );
}
//0 means any port in UnicastRemoteObject, so it is ok if not specified to make it 0
String remoteObjectPortString = PropertyUtil.extractAndLogProperty(REMOTE_OBJECT_PORT, properties);
Integer remoteObjectPort = null ;
if (remoteObjectPortString != null && remoteObjectPortString.length() != 0 ) {
remoteObjectPort = Integer.valueOf(remoteObjectPortString);
} else {
remoteObjectPort = Integer.valueOf( 0 );
}
String socketTimeoutMillisString = PropertyUtil.extractAndLogProperty(SOCKET_TIMEOUT_MILLIS, properties);
Integer socketTimeoutMillis;
if (socketTimeoutMillisString == null || socketTimeoutMillisString.length() == 0 ) {
socketTimeoutMillis = DEFAULT_SOCKET_TIMEOUT_MILLIS;
} else {
socketTimeoutMillis = Integer.valueOf(socketTimeoutMillisString);
}
return doCreateCachePeerListener(hostName, port, remoteObjectPort, cacheManager, socketTimeoutMillis);
}
protected CacheManagerPeerListener doCreateCachePeerListener(String hostName,
Integer port,
Integer remoteObjectPort,
CacheManager cacheManager,
Integer socketTimeoutMillis) {
try {
return new RMICacheManagerPeerListener(hostName, port, remoteObjectPort, cacheManager, socketTimeoutMillis);
} catch (UnknownHostException e) {
throw new CacheException( "Unable to create CacheManagerPeerListener. Initial cause was " + e.getMessage(), e);
}
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
|
public void init() throws CacheException {
if (!status.equals(Status.STATUS_UNINITIALISED)) {
return ;
}
RMICachePeer rmiCachePeer = null ;
try {
startRegistry();
int counter = 0 ;
populateListOfRemoteCachePeers();
synchronized (cachePeers) {
for (Iterator iterator = cachePeers.values().iterator(); iterator.hasNext();) {
rmiCachePeer = (RMICachePeer) iterator.next();
bind(rmiCachePeer.getUrl(), rmiCachePeer);
counter++;
}
}
LOG.debug(counter + " RMICachePeers bound in registry for RMI listener" );
status = Status.STATUS_ALIVE;
} catch (Exception e) {
String url = null ;
if (rmiCachePeer != null ) {
url = rmiCachePeer.getUrl();
}
throw new CacheException( "Problem starting listener for RMICachePeer "
+ url + ". Initial cause was " + e.getMessage(), e);
}
}
protected void startRegistry() throws RemoteException {
try {
registry = LocateRegistry.getRegistry(port.intValue());
try {
registry.list();
} catch (RemoteException e) {
//may not be created. Let's create it.
registry = LocateRegistry.createRegistry(port.intValue());
registryCreated = true ;
}
} catch (ExportException exception) {
LOG.error( "Exception starting RMI registry. Error was " + exception.getMessage(), exception);
}
}
protected void populateListOfRemoteCachePeers() throws RemoteException {
String[] names = cacheManager.getCacheNames();
for ( int i = 0 ; i < names.length; i++) {
String name = names[i];
Ehcache cache = cacheManager.getEhcache(name);
synchronized (cachePeers) {
if (cachePeers.get(name) == null ) {
if (isDistributed(cache)) {
RMICachePeer peer = new RMICachePeer(cache, hostName, port, remoteObjectPort, socketTimeoutMillis);
cachePeers.put(name, peer);
}
}
}
}
}
|
3 事件Listener
3.1配置文件
<!-- Sample cache named sampleCache2. -->
<cache name ="sampleCache2"
maxEntriesLocalHeap ="10"
eternal="false"
timeToIdleSeconds ="100"
timeToLiveSeconds ="100"
overflowToDisk="false" >
<cacheEventListenerFactory
class="net.sf.ehcache.distribution.RMICacheReplicatorFactory"
properties="replicateAsynchronously=true, replicatePuts=true, replicateUpdates=true,
replicateUpdatesViaCopy=false, replicateRemovals=true "/>
</cache>
<!-- Sample cache named sampleCache4. All missing RMICacheReplicatorFactory properties
default to true --><cachename="sampleCache4"maxEntriesLocalHeap="10"eternal="true"overflowToDisk="false"memoryStoreEvictionPolicy="LFU"><cacheEventListenerFactoryclass="net.sf.ehcache.distribution.RMICacheReplicatorFactory"/></cache>
3.2源码分析
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
public final CacheEventListener createCacheEventListener(Properties properties) {
boolean replicatePuts = extractReplicatePuts(properties);
boolean replicatePutsViaCopy = extractReplicatePutsViaCopy(properties);
boolean replicateUpdates = extractReplicateUpdates(properties);
boolean replicateUpdatesViaCopy = extractReplicateUpdatesViaCopy(properties);
boolean replicateRemovals = extractReplicateRemovals(properties);
boolean replicateAsynchronously = extractReplicateAsynchronously(properties);
int asynchronousReplicationIntervalMillis = extractReplicationIntervalMilis(properties);
if (replicateAsynchronously) {
return new RMIAsynchronousCacheReplicator(
replicatePuts,
replicatePutsViaCopy,
replicateUpdates,
replicateUpdatesViaCopy,
replicateRemovals,
asynchronousReplicationIntervalMillis);
} else {
return new RMISynchronousCacheReplicator(
replicatePuts,
replicatePutsViaCopy,
replicateUpdates,
replicateUpdatesViaCopy,
replicateRemovals);
}
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
|
/**
* Whether a put should replicated by copy or by invalidation, (a remove).
* <p/>
* By copy is best when the entry is expensive to produce. By invalidation is best when
* we are really trying to force other caches to sync back to a canonical source like a database.
* An example of a latter usage would be a read/write cache being used in Hibernate.
* <p/>
* This setting only has effect if <code>#replicateUpdates</code> is true.
*/
protected boolean replicatePutsViaCopy;
public void notifyElementPut( final Ehcache cache, final Element element) throws CacheException {
if (notAlive()) {
return ;
}
if (!replicatePuts) {
return ;
}
if (!element.isSerializable()) {
if (LOG.isWarnEnabled()) {
LOG.warn( "Object with key " + element.getObjectKey() + " is not Serializable and cannot be replicated" );
}
return ;
}
if (replicatePutsViaCopy) {
replicatePutNotification(cache, element);
} else {
replicateRemovalNotification(cache, (Serializable) element.getObjectKey());
}
}
protected static void replicatePutNotification(Ehcache cache, Element element) throws RemoteCacheException {
List cachePeers = listRemoteCachePeers(cache);
for (Object cachePeer1 : cachePeers) {
CachePeer cachePeer = (CachePeer) cachePeer1;
try {
cachePeer.put(element);
} catch (Throwable t) {
LOG.error( "Exception on replication of putNotification. " + t.getMessage() + ". Continuing..." , t);
}
}
}
static List listRemoteCachePeers(Ehcache cache) {
CacheManagerPeerProvider provider = cache.getCacheManager().getCacheManagerPeerProvider( "RMI" );
return provider.listRemoteCachePeers(cache);
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
|
public final void notifyElementPut( final Ehcache cache, final Element element) throws CacheException {
if (notAlive()) {
return ;
}
if (!replicatePuts) {
return ;
}
if (replicatePutsViaCopy) {
if (!element.isSerializable()) {
if (LOG.isWarnEnabled()) {
LOG.warn( "Object with key " + element.getObjectKey() + " is not Serializable and cannot be replicated." );
}
return ;
}
addToReplicationQueue( new CacheEventMessage(EventMessage.PUT, cache, element, null ));
} else {
if (!element.isKeySerializable()) {
if (LOG.isWarnEnabled()) {
LOG.warn( "Object with key " + element.getObjectKey()
+ " does not have a Serializable key and cannot be replicated via invalidate." );
}
return ;
}
addToReplicationQueue( new CacheEventMessage(EventMessage.REMOVE, cache, null , element.getKey()));
}
}
protected void addToReplicationQueue(CacheEventMessage cacheEventMessage) {
if (!replicationThread.isAlive()) {
LOG.error( "CacheEventMessages cannot be added to the replication queue because the replication thread has died." );
} else {
synchronized (replicationQueue) {
replicationQueue.add(cacheEventMessage);
}
}
}
private final class ReplicationThread extends Thread {
public ReplicationThread() {
super ( "Replication Thread" );
setDaemon( true );
setPriority(Thread.NORM_PRIORITY);
}
public final void run() {
replicationThreadMain();
}
}
private void replicationThreadMain() {
while ( true ) {
// Wait for elements in the replicationQueue
while (alive() && replicationQueue != null && replicationQueue.size() == 0 ) {
try {
Thread.sleep(asynchronousReplicationInterval);
} catch (InterruptedException e) {
LOG.debug( "Spool Thread interrupted." );
return ;
}
}
if (notAlive()) {
return ;
}
try {
if (replicationQueue.size() != 0 ) {
flushReplicationQueue();
}
} catch (Throwable e) {
LOG.error( "Exception on flushing of replication queue: " + e.getMessage() + ". Continuing..." , e);
}
}
}
private void flushReplicationQueue() {
List replicationQueueCopy;
synchronized (replicationQueue) {
if (replicationQueue.size() == 0 ) {
return ;
}
replicationQueueCopy = new ArrayList(replicationQueue);
replicationQueue.clear();
}
Ehcache cache = ((CacheEventMessage) replicationQueueCopy.get( 0 )).cache;
List cachePeers = listRemoteCachePeers(cache);
List resolvedEventMessages = extractAndResolveEventMessages(replicationQueueCopy);
for ( int j = 0 ; j < cachePeers.size(); j++) {
CachePeer cachePeer = (CachePeer) cachePeers.get(j);
try {
cachePeer.send(resolvedEventMessages);
} catch (UnmarshalException e) {
String message = e.getMessage();
if (message.indexOf( "Read time out" ) != 0 ) {
LOG.warn( "Unable to send message to remote peer due to socket read timeout. Consider increasing" +
" the socketTimeoutMillis setting in the cacheManagerPeerListenerFactory. " +
"Message was: " + e.getMessage());
} else {
LOG.debug( "Unable to send message to remote peer. Message was: " + e.getMessage());
}
} catch (Throwable t) {
LOG.warn( "Unable to send message to remote peer. Message was: " + t.getMessage(), t);
}
}
if (LOG.isWarnEnabled()) {
int eventMessagesNotResolved = replicationQueueCopy.size() - resolvedEventMessages.size();
if (eventMessagesNotResolved > 0 ) {
LOG.warn(eventMessagesNotResolved + " messages were discarded on replicate due to reclamation of " +
"SoftReferences by the VM. Consider increasing the maximum heap size and/or setting the " +
"starting heap size to a higher value." );
}
}
}
|
相关推荐
6. **监控与优化**:在运行时,需要监控Ehcache RMI集群的性能,包括网络延迟、同步效率、内存使用等。根据实际情况可能需要调整复制策略、缓存大小、RMI超时等参数以优化性能。 使用Ehcache RMI Replicated ...
在本实例中,我们将探讨如何配置和使用Ehcache的集群功能,以及涉及的JGroups和RMI技术。 1. **Ehcache集群**:Ehcache集群使得多台服务器上的多个Ehcache实例能够共享数据。这通过分布式缓存实现,其中的缓存项...
通过分析和运行这个Demo,开发者可以学习到RMI的生命周期管理、Ehcache的缓存机制,以及如何将两者结合起来,实现高效、可扩展的分布式系统。同时,这也会涉及到序列化、网络通信、多线程等基础知识,对于提升Java...
在这个“ehcache rmi集群demo”中,我们将探讨如何将Ehcache与RMI结合,实现一个跨节点的缓存集群。 首先,Ehcache的核心概念包括缓存管理器(Cache Manager)、缓存(Cache)、缓存项(Cache Entry)等。缓存管理...
在Ehcache集群方案中,一个关键组件是JGroups。JGroups是一个用于构建集群通信的框架,它允许节点之间进行可靠的消息传递。Ehcache利用JGroups来实现节点间的通信和数据同步,确保即使在某个节点失败时,数据也能在...
ehcache提供三种网络连接策略来实现集群,rmi,jgroup还有jms。这里只说rmi方式。同时ehcache可以可以实现多播的方式实现集群。也可以手动指定集群主机序列实现集群,本例应用手动指定。
源码分析可以帮助我们深入了解其内部机制,优化使用方式,甚至进行定制化开发。在本篇文章中,我们将探讨Ehcache的核心概念、工作原理以及关键组件。 首先,Ehcache的核心功能是提供内存缓存,它将频繁访问的数据...
提示了“使用复制”、“支持的复制类型”、“复制的最小配置”、“向现有缓存添加复制”、“使用RMI实现复制缓存”、“配置对等提供者”等关键部分,这些部分应该是手册的主体内容,分别阐述了如何实现Ehcache的远程...
本文将深入探讨四种知名的缓存解决方案:ShiftOne Cache、SwarmCache、EHCache以及JCS(Java Caching System),并分析它们的源码,以理解其工作原理和优化策略。 首先,让我们来看看ShiftOne Cache。这是一个轻量...
源码分析: 1. **内存缓存**:EhCache的核心是内存缓存,它使用HashMap存储缓存项。每个缓存项包含键和值,以及一些元数据如创建时间、过期时间等。通过高效的哈希映射,EhCache能够快速定位并检索数据。 2. **...
**Ehcache RMI 手动集群示例详解** Ehcache是一款广泛使用的Java缓存系统,它提供了高效、可扩展的内存缓存解决方案。在分布式环境中,为了实现数据的一致性和高可用性,Ehcache支持通过远程方法调用(RMI)进行...
在集群环境中,EhCache通过RMI(远程方法调用)或JGroups协议实现节点间的通信,保证了缓存数据的一致性。 二、EhCache在集群环境中的应用 1. 数据共享:在集群环境中,多个服务器节点可以共享同一份缓存数据,减少...
本示例中,"RMI 服务器与客户端源码"很可能是为了教学目的设计的,适合初学者了解RMI的基本工作原理和实践操作。 创建RMI服务器主要涉及以下步骤: 1. **定义远程接口**:远程接口是一个Java接口,其中声明了所有...
Spring RMI(Remote Method Invocation)...通过分析服务端和客户端的源码,我们可以深入理解如何在Spring框架下实现和使用RMI远程调用,这对于任何希望在分布式系统中利用Spring的开发者来说都是一份宝贵的参考资料。
这个"java rmi HelloWorld版(源码)"的压缩包文件提供了一个简单的RMI应用示例,帮助开发者了解和学习RMI的基本原理和使用。 RMI的核心概念包括: 1. **远程接口(Remote Interface)**:这是定义远程方法的接口...
在"RMI-IIOP Java 源码实例.rar"中,你可能找到以下内容: 1. **源代码**:包含了实现RMI-IIOP的Java类,包括服务器端的EJB实现、客户端的调用逻辑以及必要的接口定义。 2. **BAT批处理命令**:这些批处理文件可能...
对于复杂的缓存问题,分析源码有时是解决问题的最快途径。 至于“工具”,Ehcache生态系统中有许多辅助工具,如`Ehcache Cache Manager`,它可以帮助我们创建、修改和删除缓存实例;还有`Ehcache CLI`,一个命令行...
这个“rmi入门(带源码)”的资源可能是一个教学资料或实践项目,帮助初学者理解和应用RMI技术。 首先,我们来详细了解一下RMI的基本概念: 1. **远程接口(Remote Interface)**:这是定义远程方法的接口。它继承了...
Java Remote Method Invocation (RMI) 是Java平台上的一个特性,它允许分布式系统中的对象调用彼此的方法,即使这些对象位于不同的 JVM(Java Virtual Machine)上。这个技术在开发分布式应用程序时非常有用,尤其是...