- 浏览: 484350 次
- 性别:
- 来自: 大连
文章分类
最新评论
-
龘龘龘:
TrueBrian 写道有个问题,Sample 1中,为了控制 ...
What's New on Java 7 Phaser -
龘龘龘:
楼主总结的不错。
What's New on Java 7 Phaser -
TrueBrian:
有个问题,Sample 1中,为了控制线程的启动时机,博主实际 ...
What's New on Java 7 Phaser -
liguanqun811:
不知道楼主是否对zookeeper实现的分布式锁进行过性能测试 ...
Distributed Lock -
hobitton:
mysql的get lock有版本限制,否则get lock可 ...
Distributed Lock
3 Building Blocks
Building blocks位于org.jgroups.blocks包中,在逻辑上可以视为channels之上的一层,它提供了更复杂的接口。Building blocks并不必依赖于channels,部分building blocks只需要实现了Transport接口的类即可工作。以下简要介绍部分building blocks。
3.1 MessageDispatcher
Channels 通常用于异步地发送和接收消息。然后有些情况下需要同步通信,例如发送者希望向集群发送消息并等待所有成员的应答,或者等待部分成员的应答。MessageDispatcher支持以同步或者异步的方式发送消息,它在构造时需要一个Channel型的参数。
MessageDispatcher提供了Object handle(Message msg)方法,用于以push 方式的接收消息并返回应答(必须可以被序列化),该方法抛出的异常也会被传播到消息发送者。MessageDispatcher在内部使用了PullPushAdapter,PullPushAdapter也是org.jgroups.blocks包中的类,但是已经被标记为deprecated。这种方式被称为MessageDispatcher的server模式。
MessageDispatcher的client模式是指通过调用castMessage或者sendMessage向集群发送消息并同步或者异步的等待应答。castMessage()方法向dests指定的地址发送消息,如果dest为null,那么向集群中所有成员发送消息。castMessage()方法的返回值是RspList,RspList 实现了Map<Address,Rsp> 接口。msg参数中的目的地址会被覆盖。mode参数(由org.jgroups.blocks.GroupRequest类定义)指定了消息是同步还是异步发送,其可选值如下:
- GET_FIRST 返回收到的第一个应答。
- GET_ALL 等待所有成员的应答(被怀疑崩溃的成员除外)。
- GET_MAJORITY 等待绝大多数成员(相对与成员的个数)的应答。
- GET_ABS_MAJORITY等待绝大多数成员(一个绝对的数值,只计算一次)的应答。
- GET_N 等待n个应答,如果n大于成员的个数,可能会一直阻塞下去。
- GET_NONE 不等待应答,直接返回,即异步方式。
castMessage()方法的定义如下:
public RspList castMessage(Vector dests, Message msg, int mode, long timeout);
sendMessage()方法允许向一个成员发送消息,msg参数的目的地址不能为null。如果mode参数是GET_NONE,那么消息的发送变成异步方式;否则mode参数会被忽略(缺省采用GET_FIRST)。sendMessage()方法的定义如下:
public Object sendMessage(Message msg, int mode, long timeout) throws TimeoutException;
以下是个使用MessageDispatcher的例子:
import java.io.BufferedReader; import java.io.InputStreamReader; import org.jgroups.Channel; import org.jgroups.JChannel; import org.jgroups.Message; import org.jgroups.blocks.GroupRequest; import org.jgroups.blocks.MessageDispatcher; import org.jgroups.blocks.RequestHandler; import org.jgroups.util.RspList; public class MessageDispatcherTest { // private Channel channel; private MessageDispatcher dispatcher; private boolean propagateException = false; public void start() throws Exception { // channel = new JChannel(); dispatcher = new MessageDispatcher(channel, null, null, new RequestHandler() { public Object handle(Message msg) { System.out.println("got a message: " + msg); if(propagateException) { throw new RuntimeException("failed to handle message: " + msg.getObject()); } else { return new String("success"); } } }); channel.connect("MessageDispatcherTest"); // sendMessage(); // channel.close(); dispatcher.stop(); } private void sendMessage() throws Exception { boolean succeed = false; BufferedReader br = null; try { br = new BufferedReader(new InputStreamReader(System.in)); while(true) { System.out.print("> "); System.out.flush(); String line = br.readLine(); if(line != null && line.equals("exit")) { break; } else { Message msg = new Message(null, null, line); RspList rl = dispatcher.castMessage(null, msg, GroupRequest.GET_ALL, 0); System.out.println("Responses:\n" + rl); } } succeed = true; } finally { if(br != null) { try { br.close(); } catch (Exception e) { if(succeed) { throw e; } } } } } public static void main(String[] args) { try { new MessageDispatcherTest().start(); } catch (Exception e) { e.printStackTrace(); } } }
3.2 RpcDispatcher
RpcDispatcher 继承自MessageDispatcher,它允许远程调用集群中其它成员上的方法,并可选地等待应答。跟MessageDispatcher相比,不需要为RpcDispatcher指定RequestHandler。RpcDispatcher的构造函数接受一个Object server_obj参数,它是远程调用的目标对象。RpcDispatcher的callRemoteMethods系列方法用于远程调用目标对象上的方法,该方法可以由MethodCall指定,也可以通过方法名、参数类型指定。跟MessageDispatcher的castMessage()方法和sendMessage()方法类似,callRemoteMethods系列方法也接受一个int mode参数,其含义也相同。以下是个简单的例子:
import java.io.BufferedReader; import java.io.InputStreamReader; import org.jgroups.Channel; import org.jgroups.JChannel; import org.jgroups.blocks.GroupRequest; import org.jgroups.blocks.RpcDispatcher; import org.jgroups.util.RspList; public class RpcDispatcherTest { private Channel channel; private RpcDispatcher dispatcher; public int print(int number) throws Exception { return number * 2; } public void start() throws Exception { channel = new JChannel(); dispatcher = new RpcDispatcher(channel, null, null, this); channel.connect("RpcDispatcherTest"); // sendMessage(); // channel.close(); dispatcher.stop(); } private void sendMessage() throws Exception { boolean succeed = false; BufferedReader br = null; try { br = new BufferedReader(new InputStreamReader(System.in)); while(true) { System.out.print("> please input an int value:"); System.out.flush(); String line = br.readLine(); if(line != null && line.equals("exit")) { break; } else { int param = 0; try { param = Integer.parseInt(line); } catch(Exception e) { System.out.println("invalid input: " + line); continue; } RspList rl = dispatcher.callRemoteMethods(null, "print", new Object[]{new Integer(param)}, new Class[]{int.class}, GroupRequest.GET_ALL, 0); System.out.println("Responses: \n" + rl); } } succeed = true; } finally { if(br != null) { try { br.close(); } catch (Exception e) { if(succeed) { throw e; } } } } } public static void main(String[] args) { try { new RpcDispatcherTest().start(); } catch (Exception e) { e.printStackTrace(); } } }
3.3 ReplicatedHashMap
ReplicatedHashMap 继承自ConcurrentHashMap,并在内部使用了RpcDispatcher。ReplicatedHashMap构造函数的clustername参数指定了集群的名字,集群中所有的实例会包含相同的状态。新加入的实例在开始工作前会从集群中获得当前的状态。对实例的修改(例如通过put,remove方法)会传播到集群的其它实例中,只读的请求(例如get方法)则是本地调用。需要注意的是,ReplicatedHashMap的以下划线开头的方法是用于RpcDispatcher的远程调用的。在ReplicatedHashMap上可以注册 Notification,以便在实例的状态改变时进行回调,所有的回调也是本地的。以下是个简单的例子:
import java.io.BufferedReader; import java.io.InputStreamReader; import java.util.Iterator; import java.util.Map; import java.util.Vector; import org.jgroups.Address; import org.jgroups.ChannelFactory; import org.jgroups.JChannelFactory; import org.jgroups.View; import org.jgroups.blocks.ReplicatedHashMap; public class ReplicatedHashMapTest implements ReplicatedHashMap.Notification<String, String> { // private ReplicatedHashMap<String, String> map; public void start() throws Exception { ChannelFactory factory = new JChannelFactory(); map = new ReplicatedHashMap<String, String>("ReplicatedHashMapTest", factory, "udp.xml", false, 10000); map.addNotifier(this); sendMessage(); map.stop(); } public void entryRemoved(String key) { System.out.println("in entryRemoved(" + key + ")"); } public void entrySet(String key, String value) { System.out.println("in entrySet(" + key + "," + value + ")"); } public void contentsSet(Map<String, String> m) { System.out.println("in contentsSet(" + printMap(m) + ")"); } public void contentsCleared() { System.out.println("in contentsCleared()"); } public void viewChange(View view, Vector<Address> newMembers, Vector<Address> oldMembers) { System.out.println("in viewChange(" + view + ")"); } private void sendMessage() throws Exception { boolean succeed = false; BufferedReader br = null; try { br = new BufferedReader(new InputStreamReader(System.in)); while (true) { System.out.print("> "); System.out.flush(); String line = br.readLine(); if (line != null && line.equals("exit")) { break; } else { if (line.equals("show")) { System.out.println(printMap(map)); } else if (line.equals("clear")) { map.clear(); } else if (line.startsWith("remove ")) { String key = line.substring(line.indexOf(" ") + 1, line.length()).trim(); map.remove(key); } else if (line.startsWith("put ")) { line = line.replace("put ", ""); int index = line.indexOf("="); if (index <= 0 || index >= (line.length() - 1)) { System.out.println("invalid input"); continue; } String key = line.substring(0, index).trim(); String value = line.substring(index + 1, line.length()) .trim(); map.put(key, value); } else { System.out.println("invalid input: " + line); continue; } } } succeed = true; } finally { if (br != null) { try { br.close(); } catch (Exception e) { if (succeed) { throw e; } } } } } private String printMap(Map<String, String> m) { StringBuffer sb = new StringBuffer(); sb.append("["); for (Iterator<String> iter = map.keySet().iterator(); iter.hasNext();) { String key = iter.next(); String value = map.get(key); sb.append(key).append("=").append(value); if (iter.hasNext()) { sb.append(","); } } sb.append("]"); return sb.toString(); } public static void main(String args[]) { try { new ReplicatedHashMapTest().start(); } catch (Exception e) { e.printStackTrace(); } } }
3.4 NotificationBus
NotificationBus 提供了向集群发送通知的能力,通知可以是任何可以被序列化的对象。NotificationBus在内部使用Channel,其start()和stop()方法用于启动和停止。NotificationBus的setConsumer()方法用于注册Consumer接口,其定义如下:
public interface Consumer { void handleNotification(Serializable n); Serializable getCache(); void memberJoined(Address mbr); void memberLeft(Address mbr); }
NotificationBus的getCacheFromCoordinator() 和getCacheFromMember()用于请求集群的状态。前者是从coordinator得到状态,后者从指定地址的成员处得到状态。NotificationBus上注册的Consumer需要实现getCache()方法以返回状态。以下是个简单的例子:
import java.io.BufferedReader; import java.io.InputStreamReader; import java.io.Serializable; import java.util.Iterator; import java.util.LinkedList; import org.jgroups.Address; import org.jgroups.blocks.NotificationBus; public class NotificationBusTest implements NotificationBus.Consumer { // private NotificationBus bus; private LinkedList<Serializable> cache; public void handleNotification(Serializable n) { System.out.println("in handleNotification(" + n + ")"); if (cache != null) { cache.add(n); } } public Serializable getCache() { return cache; } public void memberJoined(Address mbr) { System.out.println("in memberJoined(" + mbr + ")"); } public void memberLeft(Address mbr) { System.out.println("in memberLeft(" + mbr + ")"); } @SuppressWarnings("unchecked") public void start() throws Exception { // bus = new NotificationBus("NotificationBusTest", null); bus.setConsumer(this); bus.start(); cache = (LinkedList<Serializable>) bus.getCacheFromCoordinator(3000, 1); if (cache == null) { cache = new LinkedList<Serializable>(); } System.out.println(printCache(cache)); // sendNotification(); // bus.stop(); } private void sendNotification() throws Exception { boolean succeed = false; BufferedReader br = null; try { br = new BufferedReader(new InputStreamReader(System.in)); while (true) { System.out.print("> "); System.out.flush(); String line = br.readLine(); if (line != null && line.equals("exit")) { break; } else { bus.sendNotification(line); } } succeed = true; } finally { if (br != null) { try { br.close(); } catch (Exception e) { if (succeed) { throw e; } } } } } private String printCache(LinkedList<Serializable> c) { StringBuffer sb = new StringBuffer(); sb.append("["); for (Iterator<Serializable> iter = c.iterator(); iter.hasNext();) { sb.append(iter.next()); if (iter.hasNext()) { sb.append(","); } } sb.append("]"); return sb.toString(); } public static void main(String[] args) { try { new NotificationBusTest().start(); } catch (Exception e) { e.printStackTrace(); } } }
发表评论
-
Understanding the Hash Array Mapped Trie
2012-03-30 10:36 0mark -
A Hierarchical CLH Queue Lock
2012-01-14 19:01 2152A Hierarchical CLH Queue Lock ( ... -
Inside AbstractQueuedSynchronizer (4)
2012-01-08 17:06 3526Inside AbstractQueuedSynchroniz ... -
Inside AbstractQueuedSynchronizer (3)
2012-01-07 23:37 4748Inside AbstractQueuedSynchroniz ... -
Inside AbstractQueuedSynchronizer (2)
2012-01-07 17:54 6370Inside AbstractQueuedSynchroniz ... -
Inside AbstractQueuedSynchronizer (1)
2012-01-06 11:04 7952Inside AbstractQueuedSynchroniz ... -
Code Optimization
2011-10-14 00:11 1612当前开发人员在进行编码的时候,可能很少关注纯粹代码级别的优化了 ... -
Distributed Lock
2011-08-02 22:02 92161 Overview 在分布式系统中,通常会 ... -
What's New on Java 7 Phaser
2011-07-29 10:15 82861 Overview Java 7的并 ... -
Sequantial Lock in Java
2011-06-07 17:00 22211 Overview Linux内核中常见的同步机 ... -
Feature or issue?
2011-04-26 22:23 121以下代码中,为何CglibTest.intercept ... -
Bloom Filter
2010-10-19 00:41 50791 Overview Bloom filt ... -
Inside java.lang.Enum
2010-08-04 15:40 64791 Introduction to enum J ... -
Open Addressing
2010-07-07 17:59 34791 Overview Open addressi ... -
JLine
2010-06-17 09:11 11014Overview JLine 是一个用来处理控 ... -
ID Generator
2010-06-14 14:45 1683关于ID Generator,想 ... -
inotify-java
2009-07-22 22:58 83111 Overview 最近公 ... -
Perf4J
2009-06-11 23:13 84931 Overview Perf4j是一个用于计算 ... -
Progress Estimator
2009-02-22 19:37 1537Jakarta Commons Cookbook这本书 ... -
jManage
2008-12-22 00:40 39581 Overview 由于项目需要, 笔者开发了一个 ...
相关推荐
jar包,官方版本,自测可用
jar包,官方版本,自测可用
jar包,官方版本,自测可用
jar包,官方版本,自测可用
jar包,官方版本,自测可用
jar包,官方版本,自测可用
jar包,官方版本,自测可用
jar包,官方版本,自测可用
jar包,官方版本,自测可用
jar包,官方版本,自测可用
JavaEE源代码 jgroups-2.2.8JavaEE源代码 jgroups-2.2.8JavaEE源代码 jgroups-2.2.8JavaEE源代码 jgroups-2.2.8JavaEE源代码 jgroups-2.2.8JavaEE源代码 jgroups-2.2.8JavaEE源代码 jgroups-2.2.8JavaEE源代码 ...
3. **Protocol Stack**: Jgroups的协议栈,由多个协议层组成,每个层负责特定的任务,如心跳检测、消息排序或故障检测。合理的配置可以优化集群性能和稳定性。 配置完成后,将Ehcache与Jgroups集成,主要涉及以下...
### JGroups教程:深入理解与应用 #### 一、安装JGroups JGroups是一款高性能、高可用性的集群通信中间件,适用于构建分布式系统。本文档将详细介绍如何安装配置JGroups,并编写一个简单的应用来演示其主要功能。 ...
jgroups.part3
Jgroups 中的 UNICAST3 协议详解 Jgroups 是一种基于 IP 多播的可靠的组播中间件,UNICAST3 协议是 Jgroups 中的一种单播协议,旨在保持单播和 UNICAST2 的正面特征,而修正负面特征。 UNICAST3 协议的主要特点是...
JGroups集群技术概述 JGroups是一个用于建立可靠的组播通信的工具包,它提供了灵活的、可定制的协议栈,以满足不同的需求。JGroups支持多种传输协议,包括UDP、TCP和JMS等。在JGroups中,消息传输可以保证可靠性,...
jgroups-raft 项目是 JGroups 框架对 Raft 的实现。Maven:<groupId>org.jgroups <artifactId>jgroups-raft <version>0.2</version>Raft 是一个容易理解的共识算法。在容错和性能方面它相当于 Paxos(Google 的一致...
"assembly-descriptors-1.2.8.zip"和"ehcache-jgroups3-replication.zip"这两个压缩包文件,显然与Ehcache和JGroups 3的集成有关。"ehcache-jgroups3-replication-master"可能包含了该项目的源代码,供开发者研究和...
jgroups-2.2.7.jar jgroups-2.2.7.jar