`
whitesock
  • 浏览: 483789 次
  • 性别: Icon_minigender_1
  • 来自: 大连
社区版块
存档分类
最新评论

Multicast Discovery

    博客分类:
  • SE
阅读更多

   这几天抽空看了看英文版的JMX in Action,本来对于JMX的理解限于一些点,读过之后感觉这些点终于连成网了。首先评论一下这本书的作者Benjamin G. Sullins和Mark B. Whipple。跟有些作者不同,这两位仁兄没有见龙卸甲,舍我其谁的风范,而是充分考虑的读者的智商和承受能力,把书写的比较浅显易懂(我觉得好的程序员在编写程序的时候,也应该具备这样的素质),有的地方甚至很唠叨。由于这本书出版的比较早(2003年),所以里面有些内容也过时了,例如书中对于RMIConnectorServer的介绍还限于SUN的参考实现,目前这部分内容已经成为标准,由JMXConnectorServer统一管理了。另外在这里抱怨一下Model MBean,虽然设计这个MBean的初衷可能是为了避免JMX的侵入性等,但是创建Model MBean的代码实在是有点丑陋。在JMX in Action中,作者介绍了使用JINI来发现MBean Agent的例子,虽然这是JINI的强项,但是个人感觉现在用Multicast作为发现机制的也不少。一时兴起,就写了个Multicast Discovery的例子。首先声明,以下代码没有经过严格测试,设计上可能也有局限。

   首先定义一下各种接口:

import java.util.List;

public interface DiscoveryAgent {
	
	List<DiscoveryService> getDiscoveryServices ();
	
	void registerService(DiscoveryService discoveryService);
	
	void addDiscoveryListener(DiscoveryListener listener);
	
	void removeDiscoveryListener(DiscoveryListener listener);
}
public interface DiscoveryService {
	
	Object getServiceId();
	
	byte[] toByteArray();
}
public interface DiscoveryServiceFactory {
	
	DiscoveryService valueOf(byte data[], int offset, int length);
	
} 
public interface DiscoveryListener {
	
	void onServiceAdd(DiscoveryAgent agent, DiscoveryService service);
	
	void onServiceRemove(DiscoveryAgent agent, DiscoveryService service);
	
}
public class DiscoveryURI {
	//
	private String host;
	private int port;
	
	public DiscoveryURI() {
	}
	
	public DiscoveryURI(String host, int port) {
		this.host = host;
		this.port = port;
	}
	
	public String getHost() {
		return host;
	}
	
	public void setHost(String host) {
		this.host = host;
	}
	
	public int getPort() {
		return port;
	}
	
	public void setPort(int port) {
		this.port = port;
	}
}

   其中DiscoveryAgent中的registerService方法用于注册一个DiscoveryService,这个DiscoveryService会通过multicast(或者其它方式)对网络上的其它DiscoveryAgent公开。当一个DiscoveryService被公开时,首先通过其toByteArray方法得到字节数组,然后将这个字节数组设置到DatagramPacket上,最终通过multicast发送到网络上。getDiscoveryServices方法用于返回目前DiscoveryAgent已经发现的所有DiscoveryService。当一个DiscoveryAgent收到一个DatagramPacket后,首先通过DiscoveryServiceFactory的valueOf方法创建一个DiscoveryService,然后再根据DiscoveryService上的serviceId对这个DiscoveryService进行进一步的处理。   

   接下来是DiscoveryAgent的实现类:MulticastDiscoveryAgent

import java.io.IOException;
import java.net.DatagramPacket;
import java.net.InetAddress;
import java.net.MulticastSocket;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.log4j.Logger;

public class MulticastDiscoveryAgent implements DiscoveryAgent, Runnable {
	//
	private static Logger logger = Logger.getLogger(MulticastDiscoveryAgent.class);
	
	//
	private static final String DEFAULT_MULTICAST_HOST = "231.0.0.1";
	private static final int DEFAULT_MULTICAST_PORT = 9697;
	private static final int DEFAULT_RECEIVE_BUFFER_SIZE = 8192;
	private static final int DEFAULT_HEART_BEAT_INTERVAL = 2000;
	private static final int DEFAULT_HEART_BEAT_MISS_BEFORE_DEATH = 5;
	
	//
	private Thread worker;
	private long lastHeartBeatTime;
	private AtomicBoolean workerStarted;
	private volatile int heartBeatInterval;
	private volatile int heartBeatMissBeforeDeath;
	private volatile int receiveBufferSize;
	private MulticastSocket multicastSocket;
	
	//
	private DiscoveryURI discoveryURI;
	private DiscoveryService registeredService;
	private DiscoveryServiceFactory discoveryServiceFactory;
	private List<DiscoveryListener> discoveryListeners;
	private List<DiscoveryServiceWrapper> discoveryServices;
	
	
	/**
	 * 
	 */
	public MulticastDiscoveryAgent() {
		this.workerStarted = new AtomicBoolean(false);
		this.receiveBufferSize = DEFAULT_RECEIVE_BUFFER_SIZE;
		this.heartBeatInterval = DEFAULT_HEART_BEAT_INTERVAL;
		this.heartBeatMissBeforeDeath = DEFAULT_HEART_BEAT_MISS_BEFORE_DEATH;
		this.discoveryURI = new DiscoveryURI(DEFAULT_MULTICAST_HOST, DEFAULT_MULTICAST_PORT);
		this.discoveryListeners = Collections.synchronizedList(new ArrayList<DiscoveryListener>());
		this.discoveryServices = Collections.synchronizedList(new ArrayList<DiscoveryServiceWrapper>());
	}
	
	/**
	 * 
	 */
	public boolean isStarted() {
		return workerStarted.get();
	}
	
	public boolean start() throws IOException {
		if (workerStarted.compareAndSet(false, true)) {
			//
			InetAddress group = InetAddress.getByName(discoveryURI.getHost());
			multicastSocket = new MulticastSocket(discoveryURI.getPort());
			multicastSocket.joinGroup(group);
			multicastSocket.setSoTimeout(heartBeatInterval);
			multicastSocket.setLoopbackMode(false);
			multicastSocket.setTimeToLive(1);
	        
			//
			worker = new Thread(this);
			worker.setDaemon(true);
			worker.start();
			
			//
			if(logger.isInfoEnabled()) {
				logger.info("multicast discovery agent started");
			}
			
			//
			return true;
		} else {
			return false;
		}
	}
	
	public boolean stop() {
		if (workerStarted.compareAndSet(true, false)) {
			// TODO
			
			//
			if(logger.isInfoEnabled()) {
				logger.info("multicast discovery agent stopped");
			}
			
			//
			return true;
		} else {
			return false;
		}
	}
	
	public void run() {
		try {
			while(workerStarted.get()) {
				send();
				receive();
			}
		} catch(Exception e) {
			logger.error("failed to run agent, detail: " + e.toString());
		} finally {
			try {
				//
				discoveryServices.clear();
				
				//
				InetAddress group = InetAddress.getByName(discoveryURI.getHost());
				multicastSocket.leaveGroup(group);
				multicastSocket.close();
				multicastSocket = null;
			} catch(Exception e) {
				logger.error("failed to close socket, detail: " + e.toString());
			}
		}
	}
	
	private void send() throws IOException {
		// If got nothing to advertise, just return
		if(this.registeredService == null) {
			return;
		}
		
		//
		long now = System.currentTimeMillis();
		if(this.heartBeatInterval + this.lastHeartBeatTime <= now) {
			this.lastHeartBeatTime = now;
			byte data[] = this.registeredService.toByteArray();
			DatagramPacket packet = new DatagramPacket (data, data.length, InetAddress.getByName (discoveryURI.getHost()), discoveryURI.getPort());
			multicastSocket.send(packet);
		}
	}
	
	private void receive() throws IOException {
		try {
			byte[] buffer = new byte[receiveBufferSize];
	        DatagramPacket packet = new DatagramPacket(buffer, 0, buffer.length);
	        multicastSocket.receive(packet);
	        if (packet.getLength() > 0) {
	        	// Parse the packet
	        	long now = System.currentTimeMillis();
	        	DiscoveryService ds = discoveryServiceFactory.valueOf(packet.getData(), packet.getOffset(), packet.getLength());
	        	
	        	//
	        	if(this.registeredService != null && this.registeredService.getServiceId().equals(ds.getServiceId())) {
	        		// Ignore
	        	} else {
	        		// Clean up the discovery services
		        	boolean gotIt = false;
		        	for(Iterator<DiscoveryServiceWrapper> iter = discoveryServices.iterator(); iter.hasNext(); ) {
		        		DiscoveryServiceWrapper dsw = iter.next();
		        		
		        		//
						if(dsw.getDiscoverService().getServiceId().equals(ds.getServiceId())) {
							dsw.setLastHeartBeatTime(now);
							gotIt = true;
						} 
						
						// Clean up
						if( dsw.getLastHeartBeatTime() + (getHeartBeatInterval() * getHeartBeatMissBeforeDeath()) < now) {
							//
							iter.remove();
							
							//
							if(logger.isInfoEnabled()) {
								logger.info("discovery service: " + ds.getServiceId() + " is removed from the agent");
							}
							
							// Notify listeners
			        		for(Iterator<DiscoveryListener> iter2 = discoveryListeners.iterator(); iter2.hasNext(); ) {
			        			iter2.next().onServiceRemove(this, dsw.getDiscoverService());
			        		}
						}
					}
		        	
		        	// Add to discovered services
		        	if(!gotIt) {
		        		//
		        		DiscoveryServiceWrapper wrapper = new DiscoveryServiceWrapper();
		        		wrapper.setLastHeartBeatTime(now);
		        		wrapper.setDiscoverService(ds);
		        		discoveryServices.add(wrapper);
		        		
		        		//
		        		if(logger.isInfoEnabled()) {
							logger.info("discovery service: " + ds.getServiceId() + " is added to the agent");
						}
		        		
		        		// Notify listeners
		        		for(Iterator<DiscoveryListener> iter = discoveryListeners.iterator(); iter.hasNext(); ) {
		        			iter.next().onServiceAdd(this, ds);
		        		}
		        	}
	        	}
	        } else {
	        	logger.warn("ignored a invalid packet");
	        }
		} catch(SocketTimeoutException ste) {
			// Ignore
		}
	}

	/**
	 * 
	 */
	public void registerService(DiscoveryService discoveryService) {
		this.registeredService = discoveryService;
	}
	
	public List<DiscoveryService> getDiscoveryServices() {
		List<DiscoveryService> r = new ArrayList<DiscoveryService>();
		for(Iterator<DiscoveryServiceWrapper> iter = discoveryServices.iterator(); iter.hasNext(); ) {
			r.add(iter.next().getDiscoverService());
		}
		return r;
	}
	
	public void addDiscoveryListener(DiscoveryListener listener) {
		if(listener != null && !discoveryListeners.contains(listener)) {
			discoveryListeners.add(listener);
		}
	}

	public void removeDiscoveryListener(DiscoveryListener listener) {
		if(listener != null) {
			discoveryListeners.remove(listener);
		}
	}
	
	/**
	 * 
	 */
	public DiscoveryURI getDiscoveryURI() {
		return discoveryURI;
	}

	public void setDiscoveryURI(DiscoveryURI discoveryURI) {
		this.discoveryURI = discoveryURI;
	}
	
	public DiscoveryServiceFactory getDiscoveryServiceFactory() {
		return discoveryServiceFactory;
	}

	public void setDiscoveryServiceFactory(DiscoveryServiceFactory discoveryServiceFactory) {
		this.discoveryServiceFactory = discoveryServiceFactory;
	}
	
	public int getReceiveBufferSize() {
		return receiveBufferSize;
	}

	public void setReceiveBufferSize(int receiveBufferSize) {
		this.receiveBufferSize = receiveBufferSize;
	}
	
	public int getHeartBeatInterval() {
		return heartBeatInterval;
	}

	public void setHeartBeatInterval(int heartBeatInterval) {
		this.heartBeatInterval = heartBeatInterval;
	}
	
	public int getHeartBeatMissBeforeDeath() {
		return heartBeatMissBeforeDeath;
	}

	public void setHeartBeatMissBeforeDeath(int heartBeatMissBeforeDeath) {
		this.heartBeatMissBeforeDeath = heartBeatMissBeforeDeath;
	}
	
	/**
	 * 
	 */
	private static class DiscoveryServiceWrapper {
		private long lastHeartBeatTime;
		private DiscoveryService discoverService;
		
		public DiscoveryServiceWrapper() {
			
		}

		public long getLastHeartBeatTime() {
			return lastHeartBeatTime;
		}

		public void setLastHeartBeatTime(long lastHeartBeatTime) {
			this.lastHeartBeatTime = lastHeartBeatTime;
		}

		public DiscoveryService getDiscoverService() {
			return discoverService;
		}

		public void setDiscoverService(DiscoveryService discoverService) {
			this.discoverService = discoverService;
		}
	}
}

   最后是用于测试的代码了, 首先定制一下SimpleDiscoveryService和SimpleDiscoveryServiceFactory,然后执行MulticastDiscoveryAgentTest,从控制台上就可以看到各个agent已经发现的service。执行一段时间后,main函数里会停止mda2,再等一段时间(大概是10秒左右),mda2的公开service就会从mda1、mda3和mda4中移除。mda4上没有注册DiscoveryService,因此mda4也就不会对外DiscoveryService,它的目的就是发现其它agent公开的DiscoveryServcie。

public class SimpleDiscoveryService implements DiscoveryService {
	//
	private String serviceId;
	
	public SimpleDiscoveryService(String serviceId) {
		this.serviceId = serviceId;
	}
	
	public Object getServiceId() {
		return serviceId;
	}

	public byte[] toByteArray() {
		return serviceId.getBytes();
	}
}
public class SimpleDiscoveryServiceFactory implements DiscoveryServiceFactory {

	public DiscoveryService valueOf(byte[] data, int offset, int length) {
		String serviceId = new String(data, offset, length);
		return new SimpleDiscoveryService(serviceId);
	}
}
import java.util.Iterator;
import java.util.List;

public class MulticastDiscoveryAgentTest {
	public static void main(String args[]) {
		try {
			//
			SimpleDiscoveryService sds1 = new SimpleDiscoveryService("service1@localhost");
			MulticastDiscoveryAgent mda1 = new MulticastDiscoveryAgent();
			mda1.registerService(sds1);
			mda1.setDiscoveryServiceFactory(new SimpleDiscoveryServiceFactory());
			mda1.start();
			
			SimpleDiscoveryService sds2 = new SimpleDiscoveryService("service2@remotehost");
			MulticastDiscoveryAgent mda2 = new MulticastDiscoveryAgent();
			mda2.registerService(sds2);
			mda2.setDiscoveryServiceFactory(new SimpleDiscoveryServiceFactory());
			mda2.start();
			
			SimpleDiscoveryService sds3 = new SimpleDiscoveryService("service3@anotherhost");
			MulticastDiscoveryAgent mda3 = new MulticastDiscoveryAgent();
			mda3.registerService(sds3);
			mda3.setDiscoveryServiceFactory(new SimpleDiscoveryServiceFactory());
			mda3.start();
			
			MulticastDiscoveryAgent mda4 = new MulticastDiscoveryAgent();
			mda4.registerService(null);
			mda4.setDiscoveryServiceFactory(new SimpleDiscoveryServiceFactory());
			mda4.addDiscoveryListener(new DiscoveryListener() {

				public void onServiceAdd(DiscoveryAgent agent, DiscoveryService service) {
					System.out.println("#on service add: " + service.getServiceId());
				}

				public void onServiceRemove(DiscoveryAgent agent, DiscoveryService service) {
					System.out.println("#on service remove: " + service.getServiceId());
				}
			});
			mda4.start();
			
			//
			int round = 0;
			while(true) {
				Thread.sleep(3000);
				
				//
				System.out.println("\n#round: " + round);
				System.out.println("services@agent1: " + getServiceIds(mda1));
				System.out.println("services@agent2: " + getServiceIds(mda2));
				System.out.println("services@agent3: " + getServiceIds(mda3));
				System.out.println("services@agent4: " + getServiceIds(mda4));
				
				//
				round++;
				if(round > 3 && mda2 != null) {
					mda2.stop();
					mda2 = null;
					System.out.println("\n#agent2 stopped, time: " + System.currentTimeMillis());
				}
			}
		} catch(Exception e) {
			e.printStackTrace();
		}
	}
	
	private static String getServiceIds(MulticastDiscoveryAgent mda) {
		if(mda == null) {
			return "";
		}
		
		StringBuffer r = new StringBuffer();
		List<DiscoveryService> discoveryServices = mda.getDiscoveryServices();
		for(Iterator<DiscoveryService>iter = discoveryServices.iterator(); iter.hasNext(); ) {
			DiscoveryService ds = iter.next();
			r.append(ds.getServiceId());
			if(iter.hasNext()) {
				r.append(",");
			}
		}
		return r.toString();
	}
}

   通过这个例子,可以在JMX中方便地将connctor server的属性对外公开,以于查找发现。另外,使用DiscoveryService的模块可能会在DiscoveryAgent之前发现这个DiscoveryService已经失效,所以可以考虑在DiscoveryAgent接口上再增加一个方法,用于DiscoveryService的客户模块强制的将DiscoveryService从DiscoveryAgent中移除。

4
0
分享到:
评论

相关推荐

    Multicast Listener Discovery Version 2(MLDv2) for IPv6

    ### Multicast Listener Discovery Version 2 (MLDv2) for IPv6 #### 一、简介 多播监听发现协议版本2(MLDv2)是为IPv6设计的一个标准跟踪协议,它允许IPv6路由器发现其直接连接链路上是否存在多播监听器(即希望...

    Windows环境下配置 PXE & DHCP

    然后,需要配置PXE服务器,包括设置Multicast Discovery选项,添加_boot server类型,选择Red Hat Linux Install启动类型,并添加客户端支持。 3. DHCP服务器配置 DHCP是一种网络协议,用于动态分配IP地址和其他...

    ssdp 协议规范 英文版

    The Simple Service Discovery Protocol (SSDP) provides a mechanism where by network clients, with little or... for multicast discovery support as well as server based notification and discovery routing.

    组播multicast

    本文将深入探讨组播的基本概念、工作机制以及关键技术点,特别是Cisco系统在组播领域的实践与理论,包括MSDP(Multicast Source Discovery Protocol)的详细讲解。 ### 组播概述 组播是一种允许一台或多台主机(组...

    Multicast_IPv6

    路由器负责将多播流量转发到正确的目标网络,这需要路由器支持多播路由协议,如PIM(Protocol Independent Multicast)或MSDP(Multicast Source Discovery Protocol)。 在实际应用中,例如在Multicast_IPv6.doc文...

    MLDv2 (Multicast_Listener_Discovery_v2)

    这是小弟制作的MLDv2的pps檔,有兴趣的人可以叁考看看.

    ns-2.1b8-mcast.tar.gz_Linux multicast_NS-2_multicast_multicast p

    在Linux环境中,多播协议主要依赖于IP层的IGMP(Internet Group Management Protocol)和数据链路层的MLD(Multicast Listener Discovery)。IGMP用于主机与路由器之间的多播组成员管理,而MLD则是IPv6环境下的等价...

    onvif_discovery

    1. **设备发现服务**:通过UDP广播或Multicast DNS (mDNS) 实现,服务端会周期性发送ONVIF特定的发现消息,通知网络中的其他设备它是一个ONVIF服务器。同时,它也会监听响应,识别出网络中其他ONVIF设备。 2. **...

    Writing.IP.Multicast-enabled.Applications

    4. **应用层多播协议**:可能还会介绍如MBGP(Multicast Border Gateway Protocol)或MSDP(Multicast Source Discovery Protocol)等高级协议,它们在更大范围内支持多播服务。 5. **安全与性能考虑**:在多播应用...

    Cisco Press: Interdomain Multicast Solutions Guide.chm

    5. **MSDP(Multicast Source Discovery Protocol)**:为了解决PIM-SM域间源发现的问题,MSDP被引入。书中会讲解其工作流程和配置步骤。 6. **QoS(Quality of Service)与组播**:组播服务通常对带宽和延迟有较高...

    onvif discovery

    2. **UDP Multicast**:SSDP使用UDP多播来广播消息,因为这比单播更高效,可以同时到达网络上的多个设备。 3. **URN(Uniform Resource Name)**:在M-SEARCH请求中,设备使用特定的URN来标识自己为ONVIF设备。例如...

    Cisco.Multicast.Routing.and.Switching.rar

    这通常涉及设置MLD(Multicast Listener Discovery,即IPv6下的IGMP)侦听器、VLAN和接口的多播策略,以及启用PIM协议。通过正确的交换机配置,可以确保多播流量只被传递到有需求的网络段,防止不必要的带宽消耗。 ...

    wcf ws-Discovery 全套源代码

    9. **多播寻址(Multicast Address)**:WS-Discovery使用特定的多播地址(如239.255.255.250:3702)进行UDP广播,以便在整个网络中传播发现消息。 10. **异步编程**:WCF支持异步操作,这在处理大量并发请求或长...

    FDL_multicast_通讯例程.zip

    例如,IGMP(Internet Group Management Protocol)在IPv4中管理多播组成员关系,而MLD(Multicast Listener Discovery)在IPv6中起相同作用。 3. **套接字编程**:在编程中实现多播,需要使用支持多播的套接字。在...

    A novel and efficient source-path discovery and maintenance method for application layer multicast

    A novel and efficient source-path discovery and maintenance method for application layer multicast

    最新LWIP源码

    * MLD (Multicast listener discovery for IPv6). Aims to be compliant with RFC 2710. No support for MLDv2 * ND (Neighbor discovery and stateless address autoconfiguration for IPv6). Aims to be ...

    library_server_service_discovery_mdns

    其中,MDNS(多播DNS,Multicast DNS)是一种广泛使用的无中心服务发现协议,尤其适用于局域网环境。本文将深入探讨MDNS在Library Server Service Discovery中的应用及其技术细节。 MDNS是DNS(域名系统)协议的一...

    企业IP组播技术专题.rar

    MSDP(Multicast Source Discovery Protocol)是PIM-SM模式下的一个补充协议,用于不同PIM域之间共享组播源信息。通过MSDP,路由器能够知道其他域内的活跃组播源,以便构建完整的组播分发树。 综上所述,企业IP组播...

    Multicast.zip_网络编程_Others_

    在Internet协议版本6(IPv6)中,组播成员管理协议由MLD(Multicast Listener Discovery)替代IGMP。尽管机制有所不同,但其核心功能保持一致,即让网络设备知道哪些接口应该接收特定组播流量。 除了基础的组播原理...

Global site tag (gtag.js) - Google Analytics