前些天看了些关于JGroups方面的资料,怕时间久了将之忘于脑后,觉得有必要记录一下,这样日后就不用再从头看啦。
现在有很多项目都使用JGroups做底层的通讯,知道的开源项目有JBoss Cache和OSCache用它做为底层支持来实现集群的,一定还有其他的项目也用到了,只不过我不知道而已了。
JGroups是一个可靠的组间通讯工具,进程可以加入一个通讯组,给组内所有的成员或单独的成员发送消息,同样,也可以从组中的成员处接收消息。系统会记录组的每一个成员,在新成员加入或是现有的成员离开或是崩溃时,会通知组内的其他成员,这样我们就不必自己去管理这些事情了。
要想加入一个组,并与组内其他的成员交互,必须建立一个Channel连接到组,同一个组内的所有成员使用相同的组名称。首先是创建一个Channel,可以直接实例化一个Channel的实现,这里用的是JChannel:
JChannel channel = new JChannel(props);
参数里指定Channel使用的协议栈,如果是空的,则使用默认的协议栈,位于JGroups包里的udp.xml。参数可以是一个以冒号分隔的字符串,或是一个XML文件,在XML文件里定义协议栈。
下面的是JGroups文档里给出的字符串的例子:
String props="UDP(mcast_addr=228.1.2.3;mcast_port=45566;ip_ttl=32):" +
"PING(timeout=3000;num_initial_members=6):" +
"FD(timeout=5000):" +
"VERIFY_SUSPECT(timeout=1500):" +
"pbcast.STABLE(desired_avg_gossip=10000):" +
"pbcast.NAKACK(gc_lag=10;retransmit_timeout=3000):" +
"UNICAST(timeout=5000;min_wait_time=2000):" +
"FRAG:" +
"pbcast.GMS(initial_mbrs_timeout=4000;join_timeout=5000;" +
"join_retry_timeout=2000;shun=false;print_local_addr=false)";
JChannel channel;
try {
channel=new JChannel(props);
}
catch(Exception ex) {
// channel creation failed
创建完之后,Channel现在处于未连接状态,需要通过connect方法将之连接到组,使其处于连接状态:
public void connect(String groupname) throws ChannelClosed;
它的参数就是要加入组的组名字,如果加入的组之前没有任何成员,则会自动创建一个组。
此时,Channel已处于连接状态,可以发送/接收消息了,发送消息的方法为:
public void send(Message msg) throws ChannelNotConnected, ChannelClosed;
public void send(Address dst, Address src, Object obj) throws ChannelNotConnected, ChannelClosed;
两个方法基本是一样的,只不过一个是直接提供一个消息,而另一个只是提供了消息的目的地,源,和消息内容,其实这个方法在内部也是通过第一个方法来实现的,在其内部,将提供的这三个参数组合成一个消息,再调用第一个方法,具体使用哪个方法,则看个人喜好和实际情况了。
消息的由消息的目的地,源,Flag,消息内容,Header组成。其中如果目的地为空,则认为是发给所有组成员的消息;源为空的话,在底层的协议将其放到网络上时,会自动的将本Channel的地址填充进去。Address则是组成员的地址,用于唯一的标识一个组成员的接口,JGroups提供了几种默认的实现。下面的例子是发送一条消息到组内所有的成员处:
Hashtable data; // any serializable data
try {
channel.send(null, null, data);
}
catch(Exception ex) {
// handle errors
再来一个发送到单独的组成员的例子:
Address receiver;
Hashtable data;
try {
receiver=channel.getView().getMembers().first();
channel.send(receiver, null, data);
}
catch(Exception ex) {
// handle errors
}
其中的channel.getView().getMembers().first()是指从Channel中取出当前的成员列表,再从中取出第一个成员。之后就可以将这个成员做为目的地来发送消息了。
可以发送消息,同样也可以接收消息:
public Object receive(long timeout) throws ChannelNotConnected, ChannelClosed, Timeout;
利用此方法可以取回多种消息,如普通的消息,View消息,等等。它的timeout参数则是指定超时的时间,如果设置为0时,而此时又没有新消息可以接收,此方法则会形成一个阻塞,在这一直等到有可用的消息为止;设置为大于0时,如果没有可用消息,超过此值后,会抛出一个Timeout异常。
下面的列表则是可以接收的消息的详细清单:
Message
View
SuspectEvent
BlockEvent
UnblockEvent
GetStateEvent
StreamingGetStateEvent
SetStateEvent
StreamingSetStateEvent
同样,也给出一个此方法的应用小例子:
Object obj;
Message msg;
View v;
obj=channel.receive(0); // wait forever
if(obj instanceof Message)
msg=(Message)obj;
else if(obj instanceof View)
v=(View)obj;
else
; // don't handle suspicions or blocks
receive()方法是Channel主动的去取消息,这种方式在现在的JGroups版本中已经不赞成被使用了,而替代方式则是通过setReceiver()方法向Channel注册一个监听器,在有消息到达的时候,自动的调用相应的方法来处理消息。
setReceiver()方法的参数是一个Receiver接口,此接口继承了MessageListener和MembershipListener,呵,看名字就知道这两个Listener是做什么的了。JGroups里提供了一个Receiver的Adapter:ReceiverAdapter,它只是为Receiver接口里的方法提供了一空的实现,可以让我们在自己的实现中只需实现关心的方法就OK了。下面是一个用注册Receiver的形式接收消息的实例:
JChannel ch=new JChannel();
ch.setReceiver(new ReceiverAdapter() {
public void receive(Message msg) {
System.out.println("received message " + msg);
}
public void viewAccepted(View new_view) {
System.out.println("received view " + new_view);
}
});
ch.connect("bla");
利用上面的的两种方式接收消息时,会将接收到的消息从消息队列中删除,如果只是想了解一下下一条消息,而又不想将它从消息队列中删除时,可以使用peek()方法,它的使用方式同receive()。
了解了如果发送和接收消息之后,现在Channel想转移到未连接状态啦,可以使用disconnect(),将Channel与组断开连接,这个时候如果再执行发送或接收消息的操作的话,那就等着接收异常吧,呵。
Channel处于未连接状态之后,可以重新连接到组,也可以通过close()方法关闭Channel,需要注意的是,执行了close()之后,就不能直接执行connect()方法来使Channel连接到组了,需要使用open()来将Channel重新打开,之后再能与组进行连接。
上面只是对JGroups的简单应用做一下整理,其实这只是JGroups的皮毛而已,JGroups还包含很多内容:状态传递,Building Blocks(OSCache使用的就是Building Blocks里的NotificationBus),还有协议栈等等好多内容,这些还得慢慢的继续学习啊。
分享到:
相关推荐
JavaEE源代码 jgroups-2.2.8JavaEE源代码 jgroups-2.2.8JavaEE源代码 jgroups-2.2.8JavaEE源代码 jgroups-2.2.8JavaEE源代码 jgroups-2.2.8JavaEE源代码 jgroups-2.2.8JavaEE源代码 jgroups-2.2.8JavaEE源代码 ...
在Ehcache通过Jgroups进行集群配置时,首先需要理解Jgroups的配置文件——`jgroups.xml`。这个文件定义了集群中节点如何相互发现、通信以及故障检测的规则。配置文件中的关键元素包括: 1. **Transport**: 定义了...
JGroups的强大之处在于其灵活性和扩展性,能够满足不同场景下的需求。 总之,JGroups为开发者提供了构建复杂分布式系统的基石,通过学习和实践,可以充分发挥其潜力,实现高效稳定的分布式通信解决方案。
JGroups集群技术概述 JGroups是一个用于建立可靠的组播通信的工具包,它提供了灵活的、可定制的协议栈,以满足不同的需求。JGroups支持多种传输协议,包括UDP、TCP和JMS等。在JGroups中,消息传输可以保证可靠性,...
jgroups-2.2.7.jar jgroups-2.2.7.jar
jgroups-raft 项目是 JGroups 框架对 Raft 的实现。Maven:<groupId>org.jgroups <artifactId>jgroups-raft <version>0.2</version>Raft 是一个容易理解的共识算法。在容错和性能方面它相当于 Paxos(Google 的一致...
jgroups.part1
《JGROUPS集群框架源码分析之消息发送、处理、接收》 JGROUPS是一款强大的开源通信框架,专为构建高可用性、高容错性的分布式系统而设计。它提供了集群内的消息传递功能,允许节点间可靠地交换信息。本文将深入探讨...
《深入解析JGroups开源框架:基于belaban-JGroups-19d7183源代码》 JGroups是一个用于构建高可用性集群的Java框架,它提供了可靠的消息传递、组成员管理和故障检测等功能,广泛应用于分布式系统中。本文将基于bela...
JGroups中的组成员管理是其核心功能之一。PING协议用于节点间的发现和心跳检测,确保了组成员的实时更新。FD(Failure Detection)协议则负责检测并隔离故障节点,保证组内的健康运行。 四、消息传输与一致性 ...
JGroups是一个开源的纯java编写的可靠的群组通讯工具。其是一个可靠的组播通讯工具集(需要说明的是,这并不是说必须要使用IP Multicast,JGroups也可以使用TCP来实现)。其工作模式基于IP多播,但可以在可靠性和群组...
Jgroups 中的 UNICAST3 协议详解 Jgroups 是一种基于 IP 多播的可靠的组播中间件,UNICAST3 协议是 Jgroups 中的一种单播协议,旨在保持单播和 UNICAST2 的正面特征,而修正负面特征。 UNICAST3 协议的主要特点是...
《JGroups:构建高效可靠的组通信系统》 JGroups是一个用Java编程语言编写的开源库,专注于实现基于IP组播的高效、可配置的组通信协议栈。它为分布式系统提供了一种健壮且灵活的方式来实现节点间的通信,是构建大...
### 关于JGroups 2.5教程:安装与开发简易应用程序 #### 安装与配置JGroups **JGroups**是一款高性能、可扩展且高度可靠的群集通信库,旨在为分布式系统提供消息传递功能。本教程将深入探讨如何安装配置JGroups,...
JGroups是一个开源的纯java编写的可靠的群组通讯工具。其是一个可靠的组播通讯工具集(需要说明的是,这并不是说必须要使用IP Multicast,JGroups也可以使用TCP来实现)。其工作模式基于IP多播,但可以在可靠性和群组...
jgroups.part3
JGroups - A Framework for Group Communication in Java ======================================================== March 3, 1998 Bela Ban 4114 Upson Hall Cornell University Ithaca, NY 14853 bba@...
jgroups-2.6.8.GA.jar jgroups-2.6.8.GA.jar
Java多播通讯框架JGroups是Java开发者用于构建高可用、高性能和可伸缩的集群通信系统的重要工具。它提供了一套全面的协议栈,能够处理网络中的节点发现、消息传递、故障检测和恢复等问题,从而使得开发分布式应用变...