JGroups是一个组播通信工具,它可以:
- 创建和删除一个组
- 加入和离开某个组
- 管理组成员关系,当有新的成员进入或存在的成员离开的时候会通知组内其它成员
- 侦测和移除出现故障的组成员
- 发送单播消息(unicast,point-to-point)
- 发送广播消息(multicast,point-to-multipoint)
JGroups的强大之处在于它有一个很灵活的协议栈,可以根据你的需要,随意的添加或删除某些功能。比如说,刚开始你使用IP广播发送你的消息,过了一会程序开始要求无损的消息传输,你可以添加NAKACK协议,它能确保接收方一定能收到你发送的消息。但是此时接收方收到的消息的顺序是不固定的,为了让接收顺序和发送顺序保持一致,你可以选择添加FIFO协议来确保一对收发者之间发送和接收的顺序。如果要确保组里所有成员的收发顺序,你可以添加TOTAL协议。再接下来,你可以添加GMS和FLUSH协议来维护组成员间的关系;FD协议可以进行故障检测;STATE_TRANSFER协议可以让新加入的组成员从已存在的成员中获取一致的状态;最后你还可以使用CRYPT协议来加密你发送的消息。
下面开始演示一个聊天组的程序。我们建立一个聊天组,分别发送单播消息和广播消息,当组成员发生变化的时候,所有组成员自动获得新的组成员视图,每当聊天组中加入一个新的成员的时候,新成员先和已存在的组成员进行状态同步(获取聊天记录)。
public class GroupChat extends ExtendedReceiverAdapter{
private JChannel channel;
private List<String> msgList = new ArrayList<String>(); //模拟状态对象,保存的是本节点的收到的消息
/**
* 在有节点向本节点请求状态的时候被调用
*/
@Override
public byte[] getState() {
byte[] state = null;
synchronized(msgList){
try {
state = Util.objectToByteBuffer(msgList);
} catch (Exception e) {
e.printStackTrace();
}
return state;
}
}
/**
* 在其他节点返回状态给本节点的时候被调用
*/
@Override
public void setState(byte[] state) {
synchronized(msgList){
try {
List<String> tmpList = (List<String>)Util.objectFromByteBuffer(state);
msgList.clear();
msgList.addAll(tmpList);
System.err.println("===receive state:[");
for(String msg : msgList){
System.out.println(msg);
}
System.out.println("]");
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* 当有消息进来的时候被调用
*/
@Override
public void receive(Message msg) {
System.out.println(">>>new message receive from "+msg.getSrc()+":"+msg.getObject());
msgList.add(msg.getSrc()+" send : "+ msg.getObject());
}
/**
* 当组成员发生变化的时候被调用
*/
@Override
public void viewAccepted(View new_view) {
System.out.println("***new view receiver:"+new_view);
}
public void start() throws ChannelException{
//打开channel并指定配置文件
channel = new JChannel("udp.xml");
//指定组名,就可以创建或连接到广播组
channel.connect("ChatGroup");
//注册回调接口,使用"推"模式来接受广播信息
channel.setReceiver(this);
//查看当前组成员
System.out.println("---current view:"+channel.getView());
//状态同步,第一个参数为null则向协调者节点获取信息
channel.getState(null, 1000);
}
public void close(){
channel.close();
}
public void loopSendMessage(){
BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
try {
while(true){
System.out.println("please input a string");
String line = br.readLine();
System.out.println("you put:"+line);
if(line.equals("exit")){
break;
}else{
String[] array = line.split(",");
Address des = null; //接收方地址,为null代表发送广播消息
Address src = null; //发送方地址,为null代表自己的地址
String msg = line; //发送内容
if(array.length == 3){
des = new IpAddress(Integer.parseInt(array[0]));
src = new IpAddress(Integer.parseInt(array[1]));
msg = array[2];
}else if(array.length == 2){
des = new IpAddress(Integer.parseInt(array[0]));
msg = array[1];
}
Message message = new Message(des, src, msg);
//发送消息
channel.send(message);
}
}
} catch (IOException e) {
e.printStackTrace();
} catch (ChannelNotConnectedException e) {
e.printStackTrace();
} catch (ChannelClosedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
GroupChat chat = new GroupChat();
try {
chat.start();
chat.loopSendMessage();
chat.close();
} catch (ChannelException e) {
e.printStackTrace();
}
}
}
可以看到JGroups提供的API屏蔽了底层的通信机制,对于开发人员来说是完全透明的,要关注的只是消息的接受处理就可以了。
对应的协议栈配置:
<config>
<UDP
mcast_group_addr="${jgroups.udp.mcast_addr:228.10.10.10}"
mcast_port="${jgroups.udp.mcast_port:45588}"
tos="8"
ucast_recv_buf_size="20000000"
ucast_send_buf_size="640000"
mcast_recv_buf_size="25000000"
mcast_send_buf_size="640000"
loopback="false"
discard_incompatible_packets="true"
max_bundle_size="64000"
max_bundle_timeout="30"
use_incoming_packet_handler="true"
ip_ttl="${jgroups.udp.ip_ttl:2}"
enable_bundling="true"
enable_diagnostics="true"
thread_naming_pattern="cl"
use_concurrent_stack="true"
thread_pool.enabled="true"
thread_pool.min_threads="1"
thread_pool.max_threads="25"
thread_pool.keep_alive_time="5000"
thread_pool.queue_enabled="false"
thread_pool.queue_max_size="100"
thread_pool.rejection_policy="Run"
oob_thread_pool.enabled="true"
oob_thread_pool.min_threads="1"
oob_thread_pool.max_threads="8"
oob_thread_pool.keep_alive_time="5000"
oob_thread_pool.queue_enabled="false"
oob_thread_pool.queue_max_size="100"
oob_thread_pool.rejection_policy="Run"/>
<PING timeout="2000"
num_initial_members="3"/>
<MERGE2 max_interval="30000"
min_interval="10000"/>
<FD_SOCK/>
<FD timeout="10000" max_tries="5" shun="true"/>
<VERIFY_SUSPECT timeout="1500" />
<BARRIER />
<pbcast.NAKACK max_xmit_size="60000"
use_mcast_xmit="false" gc_lag="0"
retransmit_timeout="300,600,1200,2400,4800"
discard_delivered_msgs="true"/>
<UNICAST timeout="300,600,1200,2400,3600"/>
<pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
max_bytes="400000"/>
<VIEW_SYNC avg_send_interval="60000" />
<pbcast.GMS print_local_addr="true" join_timeout="3000"
join_retry_timeout="2000" shun="false"
view_bundling="true"/>
<FC max_credits="20000000"
min_threshold="0.10"/>
<FRAG2 frag_size="60000" />
<!--pbcast.STREAMING_STATE_TRANSFER /-->
<pbcast.STATE_TRANSFER />
<!-- pbcast.FLUSH /-->
</config>
分享到:
相关推荐
本文通过一个具体的例子展示了如何使用 SWT 和 JGroups 实现一个简单的局域网聊天应用程序。在此过程中,我们不仅了解了 SWT 和 JGroups 的基本用法,还学习了如何处理 GUI 开发中常见的线程同步问题。这些知识对于...
2. **jgroups-2.2.8.jar**:JGroups是一个用来创建高可靠性的分布式系统的开源项目。在JSP应用中,它可能用于实现集群通信或数据同步。 3. **xerces-2.6.2.jar**:Xerces是Apache软件基金会提供的一个XML解析器,...
1. **EhCache简介** - EhCache 是一个广泛使用的Java缓存解决方案,支持本地内存缓存和分布式缓存。 - 它提供了线程安全、可配置的缓存策略,包括LRU(Least Recently Used)和LFU(Least Frequently Used)等。 ...
- **JGroups**:一个用于构建可靠集群的框架,支持TCP、UDP等多种协议,可以用来构建ehCache的分布式缓存。 分布式缓存配置需要在`ehcache.xml`中指定,并且在代码中进行相应的初始化。 7. **缓存更新和同步**:...
以下是一个简单的JMS消息发送和接收的例子: ```java // 发送消息 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); Connection connection = connectionFactory....
在上面的例子中,使用了TCP协议,也可以选择其他如JGroups等协议。 3. **启用复制策略**:在Manager配置中,可以指定何时以及如何复制Session。例如,expireSessionsOnShutdown表示是否在服务器关闭时过期Session,...
6. **网络通信**:在Tomcat集群中,服务器之间需要通过网络进行通信,这可能涉及到TCP/IP协议、JGroups等技术,用于广播消息和同步状态。 7. **故障检测与恢复**:集群需要有机制检测成员的生死状态,一旦发现某台...
3. **设置通信机制**: 例如使用JGroups进行节点间的通信,确保节点间的同步和故障检测。 接下来,我们探讨Apache作为负载均衡器的角色。Apache HTTP Server可以通过mod_proxy模块实现反向代理和负载均衡功能。它...
#### 一、SwarmCache简介 SwarmCache是一种分布式缓存系统,它主要用于在多台服务器之间共享缓存数据,以提高应用程序的性能和可扩展性。SwarmCache利用了JavaGroups进行节点间的通信,并通过多播(multicast)技术来...
增加了页面分页和后台分页方法 (如果看了我第一个例子的朋友.应该看到我在service层的find方法上的注释.说当前版本因为原来使用find方法是从代理中取.而spring默认的数据持久话只能包含业务层和数据层.不能包含...