- 浏览: 485174 次
- 性别:
- 来自: 大连
文章分类
最新评论
-
龘龘龘:
TrueBrian 写道有个问题,Sample 1中,为了控制 ...
What's New on Java 7 Phaser -
龘龘龘:
楼主总结的不错。
What's New on Java 7 Phaser -
TrueBrian:
有个问题,Sample 1中,为了控制线程的启动时机,博主实际 ...
What's New on Java 7 Phaser -
liguanqun811:
不知道楼主是否对zookeeper实现的分布式锁进行过性能测试 ...
Distributed Lock -
hobitton:
mysql的get lock有版本限制,否则get lock可 ...
Distributed Lock
这几天抽空看了看英文版的JMX in Action,本来对于JMX的理解限于一些点,读过之后感觉这些点终于连成网了。首先评论一下这本书的作者Benjamin G. Sullins和Mark B. Whipple。跟有些作者不同,这两位仁兄没有见龙卸甲,舍我其谁的风范,而是充分考虑的读者的智商和承受能力,把书写的比较浅显易懂(我觉得好的程序员在编写程序的时候,也应该具备这样的素质),有的地方甚至很唠叨。由于这本书出版的比较早(2003年),所以里面有些内容也过时了,例如书中对于RMIConnectorServer的介绍还限于SUN的参考实现,目前这部分内容已经成为标准,由JMXConnectorServer统一管理了。另外在这里抱怨一下Model MBean,虽然设计这个MBean的初衷可能是为了避免JMX的侵入性等,但是创建Model MBean的代码实在是有点丑陋。在JMX in Action中,作者介绍了使用JINI来发现MBean Agent的例子,虽然这是JINI的强项,但是个人感觉现在用Multicast作为发现机制的也不少。一时兴起,就写了个Multicast Discovery的例子。首先声明,以下代码没有经过严格测试,设计上可能也有局限。
首先定义一下各种接口:
import java.util.List; public interface DiscoveryAgent { List<DiscoveryService> getDiscoveryServices (); void registerService(DiscoveryService discoveryService); void addDiscoveryListener(DiscoveryListener listener); void removeDiscoveryListener(DiscoveryListener listener); }
public interface DiscoveryService { Object getServiceId(); byte[] toByteArray(); }
public interface DiscoveryServiceFactory { DiscoveryService valueOf(byte data[], int offset, int length); }
public interface DiscoveryListener { void onServiceAdd(DiscoveryAgent agent, DiscoveryService service); void onServiceRemove(DiscoveryAgent agent, DiscoveryService service); }
public class DiscoveryURI { // private String host; private int port; public DiscoveryURI() { } public DiscoveryURI(String host, int port) { this.host = host; this.port = port; } public String getHost() { return host; } public void setHost(String host) { this.host = host; } public int getPort() { return port; } public void setPort(int port) { this.port = port; } }
其中DiscoveryAgent中的registerService方法用于注册一个DiscoveryService,这个DiscoveryService会通过multicast(或者其它方式)对网络上的其它DiscoveryAgent公开。当一个DiscoveryService被公开时,首先通过其toByteArray方法得到字节数组,然后将这个字节数组设置到DatagramPacket上,最终通过multicast发送到网络上。getDiscoveryServices方法用于返回目前DiscoveryAgent已经发现的所有DiscoveryService。当一个DiscoveryAgent收到一个DatagramPacket后,首先通过DiscoveryServiceFactory的valueOf方法创建一个DiscoveryService,然后再根据DiscoveryService上的serviceId对这个DiscoveryService进行进一步的处理。
接下来是DiscoveryAgent的实现类:MulticastDiscoveryAgent
import java.io.IOException; import java.net.DatagramPacket; import java.net.InetAddress; import java.net.MulticastSocket; import java.net.SocketTimeoutException; import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.log4j.Logger; public class MulticastDiscoveryAgent implements DiscoveryAgent, Runnable { // private static Logger logger = Logger.getLogger(MulticastDiscoveryAgent.class); // private static final String DEFAULT_MULTICAST_HOST = "231.0.0.1"; private static final int DEFAULT_MULTICAST_PORT = 9697; private static final int DEFAULT_RECEIVE_BUFFER_SIZE = 8192; private static final int DEFAULT_HEART_BEAT_INTERVAL = 2000; private static final int DEFAULT_HEART_BEAT_MISS_BEFORE_DEATH = 5; // private Thread worker; private long lastHeartBeatTime; private AtomicBoolean workerStarted; private volatile int heartBeatInterval; private volatile int heartBeatMissBeforeDeath; private volatile int receiveBufferSize; private MulticastSocket multicastSocket; // private DiscoveryURI discoveryURI; private DiscoveryService registeredService; private DiscoveryServiceFactory discoveryServiceFactory; private List<DiscoveryListener> discoveryListeners; private List<DiscoveryServiceWrapper> discoveryServices; /** * */ public MulticastDiscoveryAgent() { this.workerStarted = new AtomicBoolean(false); this.receiveBufferSize = DEFAULT_RECEIVE_BUFFER_SIZE; this.heartBeatInterval = DEFAULT_HEART_BEAT_INTERVAL; this.heartBeatMissBeforeDeath = DEFAULT_HEART_BEAT_MISS_BEFORE_DEATH; this.discoveryURI = new DiscoveryURI(DEFAULT_MULTICAST_HOST, DEFAULT_MULTICAST_PORT); this.discoveryListeners = Collections.synchronizedList(new ArrayList<DiscoveryListener>()); this.discoveryServices = Collections.synchronizedList(new ArrayList<DiscoveryServiceWrapper>()); } /** * */ public boolean isStarted() { return workerStarted.get(); } public boolean start() throws IOException { if (workerStarted.compareAndSet(false, true)) { // InetAddress group = InetAddress.getByName(discoveryURI.getHost()); multicastSocket = new MulticastSocket(discoveryURI.getPort()); multicastSocket.joinGroup(group); multicastSocket.setSoTimeout(heartBeatInterval); multicastSocket.setLoopbackMode(false); multicastSocket.setTimeToLive(1); // worker = new Thread(this); worker.setDaemon(true); worker.start(); // if(logger.isInfoEnabled()) { logger.info("multicast discovery agent started"); } // return true; } else { return false; } } public boolean stop() { if (workerStarted.compareAndSet(true, false)) { // TODO // if(logger.isInfoEnabled()) { logger.info("multicast discovery agent stopped"); } // return true; } else { return false; } } public void run() { try { while(workerStarted.get()) { send(); receive(); } } catch(Exception e) { logger.error("failed to run agent, detail: " + e.toString()); } finally { try { // discoveryServices.clear(); // InetAddress group = InetAddress.getByName(discoveryURI.getHost()); multicastSocket.leaveGroup(group); multicastSocket.close(); multicastSocket = null; } catch(Exception e) { logger.error("failed to close socket, detail: " + e.toString()); } } } private void send() throws IOException { // If got nothing to advertise, just return if(this.registeredService == null) { return; } // long now = System.currentTimeMillis(); if(this.heartBeatInterval + this.lastHeartBeatTime <= now) { this.lastHeartBeatTime = now; byte data[] = this.registeredService.toByteArray(); DatagramPacket packet = new DatagramPacket (data, data.length, InetAddress.getByName (discoveryURI.getHost()), discoveryURI.getPort()); multicastSocket.send(packet); } } private void receive() throws IOException { try { byte[] buffer = new byte[receiveBufferSize]; DatagramPacket packet = new DatagramPacket(buffer, 0, buffer.length); multicastSocket.receive(packet); if (packet.getLength() > 0) { // Parse the packet long now = System.currentTimeMillis(); DiscoveryService ds = discoveryServiceFactory.valueOf(packet.getData(), packet.getOffset(), packet.getLength()); // if(this.registeredService != null && this.registeredService.getServiceId().equals(ds.getServiceId())) { // Ignore } else { // Clean up the discovery services boolean gotIt = false; for(Iterator<DiscoveryServiceWrapper> iter = discoveryServices.iterator(); iter.hasNext(); ) { DiscoveryServiceWrapper dsw = iter.next(); // if(dsw.getDiscoverService().getServiceId().equals(ds.getServiceId())) { dsw.setLastHeartBeatTime(now); gotIt = true; } // Clean up if( dsw.getLastHeartBeatTime() + (getHeartBeatInterval() * getHeartBeatMissBeforeDeath()) < now) { // iter.remove(); // if(logger.isInfoEnabled()) { logger.info("discovery service: " + ds.getServiceId() + " is removed from the agent"); } // Notify listeners for(Iterator<DiscoveryListener> iter2 = discoveryListeners.iterator(); iter2.hasNext(); ) { iter2.next().onServiceRemove(this, dsw.getDiscoverService()); } } } // Add to discovered services if(!gotIt) { // DiscoveryServiceWrapper wrapper = new DiscoveryServiceWrapper(); wrapper.setLastHeartBeatTime(now); wrapper.setDiscoverService(ds); discoveryServices.add(wrapper); // if(logger.isInfoEnabled()) { logger.info("discovery service: " + ds.getServiceId() + " is added to the agent"); } // Notify listeners for(Iterator<DiscoveryListener> iter = discoveryListeners.iterator(); iter.hasNext(); ) { iter.next().onServiceAdd(this, ds); } } } } else { logger.warn("ignored a invalid packet"); } } catch(SocketTimeoutException ste) { // Ignore } } /** * */ public void registerService(DiscoveryService discoveryService) { this.registeredService = discoveryService; } public List<DiscoveryService> getDiscoveryServices() { List<DiscoveryService> r = new ArrayList<DiscoveryService>(); for(Iterator<DiscoveryServiceWrapper> iter = discoveryServices.iterator(); iter.hasNext(); ) { r.add(iter.next().getDiscoverService()); } return r; } public void addDiscoveryListener(DiscoveryListener listener) { if(listener != null && !discoveryListeners.contains(listener)) { discoveryListeners.add(listener); } } public void removeDiscoveryListener(DiscoveryListener listener) { if(listener != null) { discoveryListeners.remove(listener); } } /** * */ public DiscoveryURI getDiscoveryURI() { return discoveryURI; } public void setDiscoveryURI(DiscoveryURI discoveryURI) { this.discoveryURI = discoveryURI; } public DiscoveryServiceFactory getDiscoveryServiceFactory() { return discoveryServiceFactory; } public void setDiscoveryServiceFactory(DiscoveryServiceFactory discoveryServiceFactory) { this.discoveryServiceFactory = discoveryServiceFactory; } public int getReceiveBufferSize() { return receiveBufferSize; } public void setReceiveBufferSize(int receiveBufferSize) { this.receiveBufferSize = receiveBufferSize; } public int getHeartBeatInterval() { return heartBeatInterval; } public void setHeartBeatInterval(int heartBeatInterval) { this.heartBeatInterval = heartBeatInterval; } public int getHeartBeatMissBeforeDeath() { return heartBeatMissBeforeDeath; } public void setHeartBeatMissBeforeDeath(int heartBeatMissBeforeDeath) { this.heartBeatMissBeforeDeath = heartBeatMissBeforeDeath; } /** * */ private static class DiscoveryServiceWrapper { private long lastHeartBeatTime; private DiscoveryService discoverService; public DiscoveryServiceWrapper() { } public long getLastHeartBeatTime() { return lastHeartBeatTime; } public void setLastHeartBeatTime(long lastHeartBeatTime) { this.lastHeartBeatTime = lastHeartBeatTime; } public DiscoveryService getDiscoverService() { return discoverService; } public void setDiscoverService(DiscoveryService discoverService) { this.discoverService = discoverService; } } }
最后是用于测试的代码了, 首先定制一下SimpleDiscoveryService和SimpleDiscoveryServiceFactory,然后执行MulticastDiscoveryAgentTest,从控制台上就可以看到各个agent已经发现的service。执行一段时间后,main函数里会停止mda2,再等一段时间(大概是10秒左右),mda2的公开service就会从mda1、mda3和mda4中移除。mda4上没有注册DiscoveryService,因此mda4也就不会对外DiscoveryService,它的目的就是发现其它agent公开的DiscoveryServcie。
public class SimpleDiscoveryService implements DiscoveryService { // private String serviceId; public SimpleDiscoveryService(String serviceId) { this.serviceId = serviceId; } public Object getServiceId() { return serviceId; } public byte[] toByteArray() { return serviceId.getBytes(); } }
public class SimpleDiscoveryServiceFactory implements DiscoveryServiceFactory { public DiscoveryService valueOf(byte[] data, int offset, int length) { String serviceId = new String(data, offset, length); return new SimpleDiscoveryService(serviceId); } }
import java.util.Iterator; import java.util.List; public class MulticastDiscoveryAgentTest { public static void main(String args[]) { try { // SimpleDiscoveryService sds1 = new SimpleDiscoveryService("service1@localhost"); MulticastDiscoveryAgent mda1 = new MulticastDiscoveryAgent(); mda1.registerService(sds1); mda1.setDiscoveryServiceFactory(new SimpleDiscoveryServiceFactory()); mda1.start(); SimpleDiscoveryService sds2 = new SimpleDiscoveryService("service2@remotehost"); MulticastDiscoveryAgent mda2 = new MulticastDiscoveryAgent(); mda2.registerService(sds2); mda2.setDiscoveryServiceFactory(new SimpleDiscoveryServiceFactory()); mda2.start(); SimpleDiscoveryService sds3 = new SimpleDiscoveryService("service3@anotherhost"); MulticastDiscoveryAgent mda3 = new MulticastDiscoveryAgent(); mda3.registerService(sds3); mda3.setDiscoveryServiceFactory(new SimpleDiscoveryServiceFactory()); mda3.start(); MulticastDiscoveryAgent mda4 = new MulticastDiscoveryAgent(); mda4.registerService(null); mda4.setDiscoveryServiceFactory(new SimpleDiscoveryServiceFactory()); mda4.addDiscoveryListener(new DiscoveryListener() { public void onServiceAdd(DiscoveryAgent agent, DiscoveryService service) { System.out.println("#on service add: " + service.getServiceId()); } public void onServiceRemove(DiscoveryAgent agent, DiscoveryService service) { System.out.println("#on service remove: " + service.getServiceId()); } }); mda4.start(); // int round = 0; while(true) { Thread.sleep(3000); // System.out.println("\n#round: " + round); System.out.println("services@agent1: " + getServiceIds(mda1)); System.out.println("services@agent2: " + getServiceIds(mda2)); System.out.println("services@agent3: " + getServiceIds(mda3)); System.out.println("services@agent4: " + getServiceIds(mda4)); // round++; if(round > 3 && mda2 != null) { mda2.stop(); mda2 = null; System.out.println("\n#agent2 stopped, time: " + System.currentTimeMillis()); } } } catch(Exception e) { e.printStackTrace(); } } private static String getServiceIds(MulticastDiscoveryAgent mda) { if(mda == null) { return ""; } StringBuffer r = new StringBuffer(); List<DiscoveryService> discoveryServices = mda.getDiscoveryServices(); for(Iterator<DiscoveryService>iter = discoveryServices.iterator(); iter.hasNext(); ) { DiscoveryService ds = iter.next(); r.append(ds.getServiceId()); if(iter.hasNext()) { r.append(","); } } return r.toString(); } }
通过这个例子,可以在JMX中方便地将connctor server的属性对外公开,以于查找发现。另外,使用DiscoveryService的模块可能会在DiscoveryAgent之前发现这个DiscoveryService已经失效,所以可以考虑在DiscoveryAgent接口上再增加一个方法,用于DiscoveryService的客户模块强制的将DiscoveryService从DiscoveryAgent中移除。
发表评论
-
Understanding the Hash Array Mapped Trie
2012-03-30 10:36 0mark -
A Hierarchical CLH Queue Lock
2012-01-14 19:01 2155A Hierarchical CLH Queue Lock ( ... -
Inside AbstractQueuedSynchronizer (4)
2012-01-08 17:06 3533Inside AbstractQueuedSynchroniz ... -
Inside AbstractQueuedSynchronizer (3)
2012-01-07 23:37 4763Inside AbstractQueuedSynchroniz ... -
Inside AbstractQueuedSynchronizer (2)
2012-01-07 17:54 6380Inside AbstractQueuedSynchroniz ... -
Inside AbstractQueuedSynchronizer (1)
2012-01-06 11:04 7956Inside AbstractQueuedSynchroniz ... -
Code Optimization
2011-10-14 00:11 1618当前开发人员在进行编码的时候,可能很少关注纯粹代码级别的优化了 ... -
Distributed Lock
2011-08-02 22:02 92261 Overview 在分布式系统中,通常会 ... -
What's New on Java 7 Phaser
2011-07-29 10:15 83041 Overview Java 7的并 ... -
Sequantial Lock in Java
2011-06-07 17:00 22301 Overview Linux内核中常见的同步机 ... -
Feature or issue?
2011-04-26 22:23 121以下代码中,为何CglibTest.intercept ... -
Bloom Filter
2010-10-19 00:41 50911 Overview Bloom filt ... -
Inside java.lang.Enum
2010-08-04 15:40 64921 Introduction to enum J ... -
Open Addressing
2010-07-07 17:59 34921 Overview Open addressi ... -
JLine
2010-06-17 09:11 11021Overview JLine 是一个用来处理控 ... -
ID Generator
2010-06-14 14:45 1693关于ID Generator,想 ... -
inotify-java
2009-07-22 22:58 83291 Overview 最近公 ... -
Perf4J
2009-06-11 23:13 84991 Overview Perf4j是一个用于计算 ... -
Progress Estimator
2009-02-22 19:37 1550Jakarta Commons Cookbook这本书 ... -
jManage
2008-12-22 00:40 39651 Overview 由于项目需要, 笔者开发了一个 ...
相关推荐
### Multicast Listener Discovery Version 2 (MLDv2) for IPv6 #### 一、简介 多播监听发现协议版本2(MLDv2)是为IPv6设计的一个标准跟踪协议,它允许IPv6路由器发现其直接连接链路上是否存在多播监听器(即希望...
然后,需要配置PXE服务器,包括设置Multicast Discovery选项,添加_boot server类型,选择Red Hat Linux Install启动类型,并添加客户端支持。 3. DHCP服务器配置 DHCP是一种网络协议,用于动态分配IP地址和其他...
The Simple Service Discovery Protocol (SSDP) provides a mechanism where by network clients, with little or... for multicast discovery support as well as server based notification and discovery routing.
本文将深入探讨组播的基本概念、工作机制以及关键技术点,特别是Cisco系统在组播领域的实践与理论,包括MSDP(Multicast Source Discovery Protocol)的详细讲解。 ### 组播概述 组播是一种允许一台或多台主机(组...
路由器负责将多播流量转发到正确的目标网络,这需要路由器支持多播路由协议,如PIM(Protocol Independent Multicast)或MSDP(Multicast Source Discovery Protocol)。 在实际应用中,例如在Multicast_IPv6.doc文...
这是小弟制作的MLDv2的pps檔,有兴趣的人可以叁考看看.
- **MSDP (Multicast Source Discovery Protocol)**: 用于发现和交换不同PIM域之间的组播源信息。 - **MBGP (Multicast BGP)**: 利用BGP扩展来传递组播路由信息。 #### 三、组播技术的关键组件 1. **IGMP ...
在Linux环境中,多播协议主要依赖于IP层的IGMP(Internet Group Management Protocol)和数据链路层的MLD(Multicast Listener Discovery)。IGMP用于主机与路由器之间的多播组成员管理,而MLD则是IPv6环境下的等价...
1. **设备发现服务**:通过UDP广播或Multicast DNS (mDNS) 实现,服务端会周期性发送ONVIF特定的发现消息,通知网络中的其他设备它是一个ONVIF服务器。同时,它也会监听响应,识别出网络中其他ONVIF设备。 2. **...
4. **应用层多播协议**:可能还会介绍如MBGP(Multicast Border Gateway Protocol)或MSDP(Multicast Source Discovery Protocol)等高级协议,它们在更大范围内支持多播服务。 5. **安全与性能考虑**:在多播应用...
5. **MSDP(Multicast Source Discovery Protocol)**:为了解决PIM-SM域间源发现的问题,MSDP被引入。书中会讲解其工作流程和配置步骤。 6. **QoS(Quality of Service)与组播**:组播服务通常对带宽和延迟有较高...
2. **UDP Multicast**:SSDP使用UDP多播来广播消息,因为这比单播更高效,可以同时到达网络上的多个设备。 3. **URN(Uniform Resource Name)**:在M-SEARCH请求中,设备使用特定的URN来标识自己为ONVIF设备。例如...
这通常涉及设置MLD(Multicast Listener Discovery,即IPv6下的IGMP)侦听器、VLAN和接口的多播策略,以及启用PIM协议。通过正确的交换机配置,可以确保多播流量只被传递到有需求的网络段,防止不必要的带宽消耗。 ...
9. **多播寻址(Multicast Address)**:WS-Discovery使用特定的多播地址(如239.255.255.250:3702)进行UDP广播,以便在整个网络中传播发现消息。 10. **异步编程**:WCF支持异步操作,这在处理大量并发请求或长...
例如,IGMP(Internet Group Management Protocol)在IPv4中管理多播组成员关系,而MLD(Multicast Listener Discovery)在IPv6中起相同作用。 3. **套接字编程**:在编程中实现多播,需要使用支持多播的套接字。在...
A novel and efficient source-path discovery and maintenance method for application layer multicast
对于IPv6,MLD(Multicast Listener Discovery)起着相同的作用。 3. **组播路由协议**:PIM(Protocol Independent Multicast)是常用的组播路由协议,包括PIM-Sparse Mode(PIM-SM)、PIM-Dense Mode(PIM-DM)和...
* MLD (Multicast listener discovery for IPv6). Aims to be compliant with RFC 2710. No support for MLDv2 * ND (Neighbor discovery and stateless address autoconfiguration for IPv6). Aims to be ...
其中,MDNS(多播DNS,Multicast DNS)是一种广泛使用的无中心服务发现协议,尤其适用于局域网环境。本文将深入探讨MDNS在Library Server Service Discovery中的应用及其技术细节。 MDNS是DNS(域名系统)协议的一...
MSDP(Multicast Source Discovery Protocol)是PIM-SM模式下的一个补充协议,用于不同PIM域之间共享组播源信息。通过MSDP,路由器能够知道其他域内的活跃组播源,以便构建完整的组播分发树。 综上所述,企业IP组播...