`
mengyang
  • 浏览: 266521 次
  • 性别: Icon_minigender_1
  • 来自: 福州
社区版块
存档分类
最新评论

JGroups简介和例子

阅读更多
   JGroups是一个组播通信工具,它可以:
  • 创建和删除一个组
  • 加入和离开某个组
  • 管理组成员关系,当有新的成员进入或存在的成员离开的时候会通知组内其它成员
  • 侦测和移除出现故障的组成员
  • 发送单播消息(unicast,point-to-point)
  • 发送广播消息(multicast,point-to-multipoint)

    JGroups的强大之处在于它有一个很灵活的协议栈,可以根据你的需要,随意的添加或删除某些功能。比如说,刚开始你使用IP广播发送你的消息,过了一会程序开始要求无损的消息传输,你可以添加NAKACK协议,它能确保接收方一定能收到你发送的消息。但是此时接收方收到的消息的顺序是不固定的,为了让接收顺序和发送顺序保持一致,你可以选择添加FIFO协议来确保一对收发者之间发送和接收的顺序。如果要确保组里所有成员的收发顺序,你可以添加TOTAL协议。再接下来,你可以添加GMS和FLUSH协议来维护组成员间的关系;FD协议可以进行故障检测;STATE_TRANSFER协议可以让新加入的组成员从已存在的成员中获取一致的状态;最后你还可以使用CRYPT协议来加密你发送的消息。

    下面开始演示一个聊天组的程序。我们建立一个聊天组,分别发送单播消息和广播消息,当组成员发生变化的时候,所有组成员自动获得新的组成员视图,每当聊天组中加入一个新的成员的时候,新成员先和已存在的组成员进行状态同步(获取聊天记录)。
public class GroupChat extends ExtendedReceiverAdapter{
	
	private JChannel channel;
	private List<String> msgList = new ArrayList<String>();	//模拟状态对象,保存的是本节点的收到的消息
	
	/**
	 * 在有节点向本节点请求状态的时候被调用
	 */
	@Override
	public byte[] getState() {
		byte[] state = null;
		synchronized(msgList){
			try {
				state = Util.objectToByteBuffer(msgList);
			} catch (Exception e) {
				e.printStackTrace();
			}
			return state;
		}
	}

	/**
	 * 在其他节点返回状态给本节点的时候被调用
	 */
	@Override
	public void setState(byte[] state) {
		synchronized(msgList){
			try {
				List<String> tmpList = (List<String>)Util.objectFromByteBuffer(state);
				msgList.clear();
				msgList.addAll(tmpList);
				System.err.println("===receive state:[");
				for(String msg : msgList){
					System.out.println(msg);
				}
				System.out.println("]");
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
	}

	/**
	 * 当有消息进来的时候被调用
	 */
	@Override
	public void receive(Message msg) {
		System.out.println(">>>new message receive from "+msg.getSrc()+":"+msg.getObject());
		msgList.add(msg.getSrc()+" send : "+ msg.getObject());
	}

	/**
	 * 当组成员发生变化的时候被调用
	 */
	@Override
	public void viewAccepted(View new_view) {
		System.out.println("***new view receiver:"+new_view);
	}
	
	public void start() throws ChannelException{
		//打开channel并指定配置文件
		channel = new JChannel("udp.xml");
		//指定组名,就可以创建或连接到广播组
		channel.connect("ChatGroup");
		//注册回调接口,使用"推"模式来接受广播信息
		channel.setReceiver(this);
		//查看当前组成员
		System.out.println("---current view:"+channel.getView());
		//状态同步,第一个参数为null则向协调者节点获取信息
		channel.getState(null, 1000);
	}
	
	public void close(){
		channel.close();
	}
	
	public void loopSendMessage(){
		BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
		try {
			while(true){
				System.out.println("please input a string");
				String line = br.readLine();
				System.out.println("you put:"+line);
				if(line.equals("exit")){
					break;
				}else{
					String[] array = line.split(",");
					
					Address des = null;  	//接收方地址,为null代表发送广播消息
					Address src = null;		//发送方地址,为null代表自己的地址
					String msg = line;		//发送内容
					
					if(array.length == 3){
						des = new IpAddress(Integer.parseInt(array[0]));
						src = new IpAddress(Integer.parseInt(array[1]));
						msg = array[2];
					}else if(array.length == 2){
						des = new IpAddress(Integer.parseInt(array[0]));
						msg = array[1];
					}
					
					Message message = new Message(des, src, msg);
					//发送消息
					channel.send(message);
				}
			}
		} catch (IOException e) {
			e.printStackTrace();
		} catch (ChannelNotConnectedException e) {
			e.printStackTrace();
		} catch (ChannelClosedException e) {
			e.printStackTrace();
		}
	}
	
	
	public static void main(String[] args) {
		GroupChat chat = new GroupChat();
		try {
			chat.start();
			chat.loopSendMessage();
			chat.close();
		} catch (ChannelException e) {
			e.printStackTrace();
		}
		
	}

}

  可以看到JGroups提供的API屏蔽了底层的通信机制,对于开发人员来说是完全透明的,要关注的只是消息的接受处理就可以了。
  对应的协议栈配置:
<config>
    <UDP
         mcast_group_addr="${jgroups.udp.mcast_addr:228.10.10.10}"
         mcast_port="${jgroups.udp.mcast_port:45588}"
         tos="8"
         ucast_recv_buf_size="20000000"
         ucast_send_buf_size="640000"
         mcast_recv_buf_size="25000000"
         mcast_send_buf_size="640000"
         loopback="false"
         discard_incompatible_packets="true"
         max_bundle_size="64000"
         max_bundle_timeout="30"
         use_incoming_packet_handler="true"
         ip_ttl="${jgroups.udp.ip_ttl:2}"
         enable_bundling="true"
         enable_diagnostics="true"
         thread_naming_pattern="cl"

         use_concurrent_stack="true"

         thread_pool.enabled="true"
         thread_pool.min_threads="1"
         thread_pool.max_threads="25"
         thread_pool.keep_alive_time="5000"
         thread_pool.queue_enabled="false"
         thread_pool.queue_max_size="100"
         thread_pool.rejection_policy="Run"

         oob_thread_pool.enabled="true"
         oob_thread_pool.min_threads="1"
         oob_thread_pool.max_threads="8"
         oob_thread_pool.keep_alive_time="5000"
         oob_thread_pool.queue_enabled="false"
         oob_thread_pool.queue_max_size="100"
         oob_thread_pool.rejection_policy="Run"/>

    <PING timeout="2000"
            num_initial_members="3"/>
    <MERGE2 max_interval="30000"
            min_interval="10000"/>
    <FD_SOCK/>
    <FD timeout="10000" max_tries="5"   shun="true"/>
    <VERIFY_SUSPECT timeout="1500"  />
    <BARRIER />
    <pbcast.NAKACK max_xmit_size="60000"
                   use_mcast_xmit="false" gc_lag="0"
                   retransmit_timeout="300,600,1200,2400,4800"
                   discard_delivered_msgs="true"/>
    <UNICAST timeout="300,600,1200,2400,3600"/>
    <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
                   max_bytes="400000"/>
    <VIEW_SYNC avg_send_interval="60000"   />
    <pbcast.GMS print_local_addr="true" join_timeout="3000"
                join_retry_timeout="2000" shun="false"
                view_bundling="true"/>
    <FC max_credits="20000000"
                    min_threshold="0.10"/>
    <FRAG2 frag_size="60000"  />
    <!--pbcast.STREAMING_STATE_TRANSFER /-->
    <pbcast.STATE_TRANSFER  />
    <!-- pbcast.FLUSH  /-->
</config>
1
0
分享到:
评论

相关推荐

    java SWT编写JGroup局域网聊天程序

    本文通过一个具体的例子展示了如何使用 SWT 和 JGroups 实现一个简单的局域网聊天应用程序。在此过程中,我们不仅了解了 SWT 和 JGroups 的基本用法,还学习了如何处理 GUI 开发中常见的线程同步问题。这些知识对于...

    jsp代码这是jsp百例的部分例子(92例的lib)(之二)

    2. **jgroups-2.2.8.jar**:JGroups是一个用来创建高可靠性的分布式系统的开源项目。在JSP应用中,它可能用于实现集群通信或数据同步。 3. **xerces-2.6.2.jar**:Xerces是Apache软件基金会提供的一个XML解析器,...

    spring整合EhCache 的简单例子

    1. **EhCache简介** - EhCache 是一个广泛使用的Java缓存解决方案,支持本地内存缓存和分布式缓存。 - 它提供了线程安全、可配置的缓存策略,包括LRU(Least Recently Used)和LFU(Least Frequently Used)等。 ...

    ehCache 使用例子

    - **JGroups**:一个用于构建可靠集群的框架,支持TCP、UDP等多种协议,可以用来构建ehCache的分布式缓存。 分布式缓存配置需要在`ehcache.xml`中指定,并且在代码中进行相应的初始化。 7. **缓存更新和同步**:...

    ActiveMQ 消息队列

    以下是一个简单的JMS消息发送和接收的例子: ```java // 发送消息 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); Connection connection = connectionFactory....

    Tomcat通过自带的Cluster方式实现Session会话共享环境操作记录(个人精华版)

    在上面的例子中,使用了TCP协议,也可以选择其他如JGroups等协议。 3. **启用复制策略**:在Manager配置中,可以指定何时以及如何复制Session。例如,expireSessionsOnShutdown表示是否在服务器关闭时过期Session,...

    tomcat 集群案例

    6. **网络通信**:在Tomcat集群中,服务器之间需要通过网络进行通信,这可能涉及到TCP/IP协议、JGroups等技术,用于广播消息和同步状态。 7. **故障检测与恢复**:集群需要有机制检测成员的生死状态,一旦发现某台...

    Tomcat集群,Apache负载均衡

    3. **设置通信机制**: 例如使用JGroups进行节点间的通信,确保节点间的同步和故障检测。 接下来,我们探讨Apache作为负载均衡器的角色。Apache HTTP Server可以通过mod_proxy模块实现反向代理和负载均衡功能。它...

    swarmcache 缓存 入门 事例

    #### 一、SwarmCache简介 SwarmCache是一种分布式缓存系统,它主要用于在多台服务器之间共享缓存数据,以提高应用程序的性能和可扩展性。SwarmCache利用了JavaGroups进行节点间的通信,并通过多播(multicast)技术来...

    (2.0版本)自己写的struts2+hibernate+spring实例

    增加了页面分页和后台分页方法 (如果看了我第一个例子的朋友.应该看到我在service层的find方法上的注释.说当前版本因为原来使用find方法是从代理中取.而spring默认的数据持久话只能包含业务层和数据层.不能包含...

Global site tag (gtag.js) - Google Analytics