`

JGroup的初步学习

 
阅读更多

转自:http://blog.sina.com.cn/s/blog_605f5b4f01010dum.html

 

现在有很多项目都使用JGroups做底层的通讯,知道的开源项目有JBoss Cache和OSCache用它做为底层支持来实现集群的,一定还有其他的项目也用到了,只不过我不知道而已了。 

JGroups 适合使用场合
服务器集群cluster、多服务器通讯、服务器replication(复制)等,分布式cache缓存

JGroups 简介
JGroups是一个基于Java语言的提供可靠多播(组播)的开发工具包。在IP Multicast基础上提供可靠服务,也可以构建在TCP或者WAN上。主要是由Bela Ban开发,属于JBoss.org,在JBoss的网站也有一些相关文档。目前在 SourceForge上还是比较活跃,经常保持更新。


JGroups是一个可靠的组间通讯工具,进程可以加入一个通讯组,给组内所有的成员或单独的成员发送消息,同样,也可以从组中的成员处接收消息。系统会记录组的每一个成员,在新成员加入或是现有的成员离开或是崩溃时,会通知组内的其他成员,这样我们就不必自己去管理这些事情了。 

要想加入一个组,并与组内其他的成员交互,必须建立一个Channel连接到组,同一个组内的所有成员使用相同的组名称。首先是创建一个Channel,可以直接实例化一个Channel的实现,这里用的是JChannel:

 

JChannel channel = new JChannel(props);

 

参数里指定Channel使用的协议栈,如果是空的,则使用默认的协议栈,位于JGroups包里的udp.xml。参数可以是一个以冒号分隔的字符串,或是一个XML文件,在XML文件里定义协议栈。 

下面的是JGroups文档里给出的字符串的例子:

String props="UDP(mcast_addr=228.1.2.3;mcast_port=45566;ip_ttl=32):" +  
        "PING(timeout=3000;num_initial_members=6):" +  
        "FD(timeout=5000):" +  
        "VERIFY_SUSPECT(timeout=1500):" +  
        "pbcast.STABLE(desired_avg_gossip=10000):" +  
        "pbcast.NAKACK(gc_lag=10;retransmit_timeout=3000):" +  
        "UNICAST(timeout=5000;min_wait_time=2000):" +  
        "FRAG:" +  
        "pbcast.GMS(initial_mbrs_timeout=4000;join_timeout=5000;" +  
        "join_retry_timeout=2000;shun=false;print_local_addr=false)";  
JChannel channel;  
try {  
    channel=new JChannel(props);  
}  
catch(Exception ex) {  
// channel creation failed  

 创建完之后,Channel现在处于未连接状态,需要通过connect方法将之连接到组,使其处于连接状态: 

 

public void connect(String groupname) throws ChannelClosed;  

 它的参数就是要加入组的组名字,如果加入的组之前没有任何成员,则会自动创建一个组。 

此时,Channel已处于连接状态,可以发送/接收消息了,发送消息的方法为: 

 

public void send(Message msg) throws ChannelNotConnected, ChannelClosed;  
public void send(Address dst, Address src, Object obj) throws ChannelNotConnected, Channel

 两个方法基本是一样的,只不过一个是直接提供一个消息,而另一个只是提供了消息的目的地,源,和消息内容,其实这个方法在内部也是通过第一个方法来实现的,在其内部,将提供的这三个参数组合成一个消息,再调用第一个方法,具体使用哪个方法,则看个人喜好和实际情况了。 


消息的由消息的目的地,源,Flag,消息内容,Header组成。其中如果目的地为空,则认为是发给所有组成员的消息;源为空的话,在底层的协议将其放到网络上时,会自动的将本Channel的地址填充进去。Address则是组成员的地址,用于唯一的标识一个组成员的接口,JGroups提供了几种默认的实现。下面的例子是发送一条消息到组内所有的成员处: 

 

Hashtable data; // any serializable data  
try {  
    channel.send(null, null, data);  
}  
catch(Exception ex) {  
// handle errors  

 再来一个发送到单独的组成员的例子:

Address receiver;  
Hashtable data;  
try {  
receiver=channel.getView().getMembers().first();  
channel.send(receiver, null, data);  
}  
catch(Exception ex) {  
// handle errors  
}  

 其中的channel.getView().getMembers().first()是指从Channel中取出当前的成员列表,再从中取出第一个成员。之后就可以将这个成员做为目的地来发送消息了。 


可以发送消息,同样也可以接收消息: 

public Object receive(long timeout) throws ChannelNotConnected, ChannelClosed, Timeout;  

 利用此方法可以取回多种消息,如普通的消息,View消息,等等。它的timeout参数则是指定超时的时间,果设置为0时,而此时又没有新消息可以接收,此方法则会形成一个阻塞,在这一直等到有可用的消息为止;设置为大于0时,如果没有可用消息,超过此值后,会抛出一个Timeout异常。 


下面的列表则是可以接收的消息的详细清单: 

Message  
View  
SuspectEvent  
BlockEvent  
UnblockEvent  
GetStateEvent  
StreamingGetStateEvent  
SetStateEvent  
StreamingSetStateEvent  

 同样,也给出一个此方法的应用小例子: 

Object obj;  
Message msg;  
View v;  
obj=channel.receive(0); // wait forever  
if(obj instanceof Message)  
msg=(Message)obj;  
else if(obj instanceof View)  
v=(View)obj;  
else  
; // don't handle suspicions or blocks  

 receive()方法是Channel主动的去取消息,这种方式在现在的JGroups版本中已经不赞成被使用了,而替代方式则是通过setReceiver()方法向Channel注册一个监听器,在有消息到达的时候,自动的调用相应的方法来处理消息。 


setReceiver()方法的参数是一个Receiver接口,此接口继承了MessageListener和MembershipListener,呵,看名字就知道这两个Listener是做什么的了。JGroups里提供了一个Receiver的Adapter:ReceiverAdapter,它只是为Receiver接口里的方法提供了一空的实现,可以让我们在自己的实现中只需实现关心的方法就OK了。下面是一个用注册Receiver的形式接收消息的实例:

JChannel ch=new JChannel();  
ch.setReceiver(new ReceiverAdapter() {  
    public void receive(Message msg) {  
        System.out.println("received message " + msg);  
    }  
    public void viewAccepted(View new_view) {  
        System.out.println("received view " + new_view);  
    }  
    });  
ch.connect("bla");  

 利用上面的的两种方式接收消息时,会将接收到的消息从消息队列中删除,如果只是想了解一下下一条消息,而又不想将它从消息队列中删除时,可以使用peek()方法,它的使用方式同receive()。

 
了解了如果发送和接收消息之后,现在Channel想转移到未连接状态啦,可以使用disconnect(),将Channel与组断开连接,这个时候如果再执行发送或接收消息的操作的话,那就等着接收异常吧,呵。 

Channel处于未连接状态之后,可以重新连接到组,也可以通过close()方法关闭Channel,需要注意的是,执行了close()之后,就不能直接执行connect()方法来使Channel连接到组了,需要使用open()来将Channel重新打开,之后再能与组进行连接。 

上面只是对JGroups的简单应用做一下整理,其实这只是JGroups的皮毛而已,JGroups还包含很多内容:状态传递,Building Blocks(OSCache使用的就是Building Blocks里的NotificationBus),还有协议栈等等好多内容,这些还得慢慢的继续学习啊。

转自:
http://www.iteye.com/topic/81783

 

JGroups使用例子, JGroups demo, Tim的hello world例子
Timreceiver.java

import org.jgroups.tests.perf.Receiver;
import org.jgroups.tests.perf.Transport;
import org.jgroups.util.Util;

public class TimReceiver implements Receiver {
private Transport transport = null;

public static void main(String[] args) {
TimReceiver t = new TimReceiver();
try {
int sendMsgCount = 5000;
int msgSize = 1000;
t.start();

t.sendMessages(sendMsgCount, msgSize);
System.out.println("########## Begin to recv...");
Thread.currentThread().join();
} catch (Exception e) {
e.printStackTrace();
} finally {
if (t != null) {
t.stop();
}
}
}

public void start()
throws Exception {
transport = (Transport) new TimTransport();
transport.create(null);
transport.setReceiver(this);
transport.start();
}

public void stop() {
if (transport != null) {
transport.stop();
transport.destroy();
}
}

private int count = 0;
public void receive(Object sender, byte[] data) {
System.out.print(".");
if (++count == 5000) {
System.out.println("\r\nRECV DONE.");
System.exit(0);
}

}

private void sendMessages(int count, int msgSize)
throws Exception {
byte[] buf = new byte[msgSize];
for (int k = 0; k < msgSize; k++)
buf[k] = 'T';

System.out.println("-- sending " + count + " " + Util.printBytes(msgSize) + " messages");

for (int i = 0; i < count; i++) {
transport.send(null, buf);
}

System.out.println("######### send complete");
}
}

 TimTransport.java

import java.util.Map;
import java.util.Properties;

import org.jgroups.Address;
import org.jgroups.JChannel;
import org.jgroups.Message;
import org.jgroups.ReceiverAdapter;
import org.jgroups.tests.perf.Receiver;
import org.jgroups.tests.perf.Transport;

public class TimTransport extends ReceiverAdapter implements Transport{
private JChannel channel = null;
private String groupName = "TimDemo";
private Receiver receiver = null;

String PROTOCOL_STACK_UDP1 = "UDP(bind_addr=192.168.100.59"; 
String PROTOCOL_STACK_UDP2 = ";mcast_port=8888";
String PROTOCOL_STACK_UDP3 = ";mcast_addr=225.1.1.1";
String PROTOCOL_STACK_UDP4 = ";tos=8;loopback=false;max_bundle_size=64000;" +
"use_incoming_packet_handler=true;use_outgoing_packet_handler=false;ip_ttl=2;enable_bundling=true):"
+ "PING:MERGE2:FD_SOCK:FD:VERIFY_SUSPECT:"
+"pbcast.NAKACK(gc_lag=50;max_xmit_size=50000;use_mcast_xmit=false;" +
"retransmit_timeout=300,600,1200,2400,4800;discard_delivered_msgs=true):"
+"UNICAST:pbcast.STABLE:VIEW_SYNC:"
+"pbcast.GMS(print_local_addr=false;join_timeout=3000;" +
"join_retry_timeout=2000;" +
"shun=true;view_bundling=true):"
+"FC(max_credits=2000000;min_threshold=0.10):FRAG2(frag_size=50000)";


public Object getLocalAddress() {
return channel != null ? channel.getLocalAddress() : null;
}

public void start() throws Exception {
channel.connect(groupName); 
}

public void stop() {
if (channel != null) {
channel.shutdown();
}
}

public void destroy() {
if (channel != null) {
channel.close();
channel = null;
}
}

public void setReceiver(Receiver r) {
this.receiver = r;
}

public Map dumpStats() {
return channel != null ? channel.dumpStats() : null;
}

public void send(Object destination, byte[] payload) throws Exception {
byte[] tmp = new byte[payload.length];
System.arraycopy(payload, 0, tmp, 0, payload.length);
Message msg = null;
msg = new Message((Address) destination, null, tmp);
if (channel != null) {
channel.send(msg);
}
}

public void receive(Message msg) {
Address sender = msg.getSrc();
byte[] payload = msg.getBuffer();
if (receiver != null) {
try {
receiver.receive(sender, payload);
} catch (Throwable tt) {
tt.printStackTrace();
}
}
}

public void create(Properties config) throws Exception {
String PROTOCOL_STACK = PROTOCOL_STACK_UDP1 + PROTOCOL_STACK_UDP2 + PROTOCOL_STACK_UDP3 + PROTOCOL_STACK_UDP4;
channel = new JChannel(PROTOCOL_STACK);
channel.setReceiver(this); 
}

public void send(Object destination, byte[] payload, boolean oob) throws Exception {
send(destination, payload);
}
}

 

分享到:
评论

相关推荐

    Jgroup学习总结

    **JGroup学习总结** JGroup是一个开源的Java框架,专门用于构建高可用、容错的分布式系统。它提供了一整套服务,包括组成员管理、消息传递、故障检测和恢复等,是许多分布式应用和中间件的基础。这篇博客将深入探讨...

    jgroup手册

    Reliable group communication with JGroups 3.x Preface This is the JGroups manual. It provides information about: 1. Installation and configuration 2. Using JGroups (the API) 3. Configuration of the ...

    jgroup使用实例

    JGroup是Java编程语言中的一款强大且灵活的集群通信库,专为构建高可用性、高性能的分布式系统而设计。它的核心目标是提供可靠的消息传递,确保数据在多个节点之间的一致性和完整性。本实例将深入讲解如何使用JGroup...

    jgroup master

    ### JGroups:可靠的组通信 #### 概览与核心概念 **JGroups**是一个用于创建分布式应用程序的Java库,它提供了可靠、高效的组播通信功能。JGroups支持多种传输层协议,包括TCP、UDP和多播等,并允许开发人员自定义...

    jgroup代码

    根据提供的文档内容,本文将对“jgroup代码”的安装步骤及如何编写一个简单的应用进行详细的阐述与解析。 ### 一、jgroup代码简介 JGroups是一个高性能、可扩展且易于使用的分布式通信库,用于实现集群中的节点...

    EHCACHE集群配置-JGroup篇

    EHCAHCE基于JGROUP的集群配置方案,内含相关配置文件,及配置说明

    java SWT编写JGroup局域网聊天程序

    ### Java SWT 编写 JGroup 局域网聊天程序知识点...在此过程中,我们不仅了解了 SWT 和 JGroups 的基本用法,还学习了如何处理 GUI 开发中常见的线程同步问题。这些知识对于开发类似的分布式系统或网络应用非常有用。

    《jgroup in action》

    根据提供的信息,《jgroup in action》是一本关于JGroups工具包的书籍,它详细介绍了如何使用JGroups进行可靠的多播通信。JGroups是强大的UUP(User-level UDP)开源组件,已被JBoss采用,用于底层通信。下面我们将...

    JGROUPS集群框架源码分析之消息发送、处理、接收

    《JGROUPS集群框架源码分析之消息发送、处理、接收》 JGROUPS是一款强大的开源通信框架,专为构建高可用性、高容错性的分布式系统而设计。它提供了集群内的消息传递功能,允许节点间可靠地交换信息。...

    jgroup笔记.doc

    JGroup 是一个强大的开源库,专门用于构建集群通信系统。它的主要目标是在集群内部实现可靠的消息传递,确保数据的一致性和高可用性。与 Java Message Service (JMS) 不同,JGroup 更专注于消息传递,而不是队列和...

    jboss jdbc json jgroup.jar

    在IT行业中,JBoss、JDBC、JSON和JGroup是四个关键的概念,它们在不同的领域发挥着重要作用。这里,我们将深入探讨这些技术及其在实际应用中的相关知识点。 首先,JBoss是一个开源的应用服务器,它是Java EE(企业...

    jgroup-3.0.1

    《JGroup-3.0.1:构建高效集群通信的核心技术》 JGroup是一个开源的Java框架,专门用于构建高可用、高性能的集群系统。它提供了健壮的组通信服务,包括成员资格管理、消息传递、故障检测以及一致性算法等。在版本...

    jgroup配置[收集].pdf

    《JGroup配置详解》 JGroup是一个开源的Java框架,专门用于构建可靠的消息传递系统,尤其是在分布式计算环境中。本文将深入解析JGroup的协议栈配置,重点探讨传输协议和可靠消息传递机制。 **4.1 传输协议** 传输...

    jgroup配置[归类].pdf

    在软件开发领域,JGroup是一个关键的组件,用于构建可靠的消息传递和组通信系统。JGroup的核心在于其协议栈,它由一系列的协议层组成,这些协议层共同负责消息的发送、接收、可靠传输以及组成员发现。本文将详细解析...

    GroupData:使用JGroup实现分布式数据结构(堆栈和集合)

    使用JGroup实现分布式数据结构(堆栈和集合) 介绍 [什么是JGroups?]( ) [JGroup入门]( ) JGroups是完全用Java编写的可靠的组通信工具包。 它基于IP多播(也支持TCP),但是有一些特殊功能,例如可靠性和组...

    Jgroups-all.jar

    JGroup是当前被广泛使用的可靠组间通信的工具之一。例如OSCache以及JBossTreeCache都是用的是JGroup。 JGroup功能十分强大,通过配置各种参数就可以充分利用它所提供的各项功能。JGroup最大的特点就是支持协议栈的...

    基于JGROUPS的ehcache的分布式缓存复制

    在IT行业中,分布式缓存是一种优化高并发场景下数据访问性能的重要技术,它通过在网络中的多台服务器上分发数据来提高系统的响应速度和可...通过学习和实践这一技术,开发者可以更好地应对高并发环境下的数据处理挑战。

    jGroup-OOPProjekt

    jGroup-OOPProjekt rocc 包含整个项目。 在 rocc 中还包含两个 bat 文件。 “reBuild”文件重建项目,另一个“run”运行项目。 RuinsOfCorrosaCity.zip 是可玩游戏及其脚本的包。 文档包含 RAD 和 SDD 会议包含...

    encache+jgroups集群缓存共享

    在IT行业中,缓存共享是提高系统性能和可伸缩性的重要手段,特别是在分布式系统中。"encache+jgroups集群缓存共享"这个主题聚焦于如何利用EnCache和JGroups两个技术来实现高效的集群间缓存共享。...

Global site tag (gtag.js) - Google Analytics