`
assertmyself
  • 浏览: 29706 次
  • 性别: Icon_minigender_1
  • 来自: 南京
文章分类
社区版块
存档分类
最新评论

组播发现服务器的java实现

阅读更多
组播发现服务器的一个示例


发现服务器
DiscoverServer
package com.gbcom.ccsv3.transport.multidiscover;

import org.apache.log4j.Logger;

/**
 * 发现服务器
 * 
 * <p>
 * 
 * @author syz
 *         <p>
 * @date 2015-6-26,下午04:23:36
 *       <p>
 * @version v1.0.0
 *          <p>
 * @see com.gbcom.ccsv3.transport.multidiscover.DiscoverServer
 */
public class DiscoverServer {
	private static final Logger LOGGER = Logger.getLogger(DiscoverServer.class);
	
	private static class DiscoverServerHolder{
		private static final DiscoverServer INSTANCE = new DiscoverServer();
	}
	/**
	 * 获取单例
	 * 
	 * @return DiscoverServer
	 */
	public static DiscoverServer getInstance() {
		return DiscoverServerHolder.INSTANCE;
	}

	private boolean started = false;
	private Receiver discover;

	private DiscoverServer() {
		discover = UdpDiscoverFactory.getMultiUdpDiscover();
	}

	/**
	 * 开
	 */
	public void on() {
		started = true;

		Thread t = new Thread(new Runnable() {

			@Override
			public void run() {
				// TODO Auto-generated method stub
				try {
					discover.start();
				} catch (Exception e) {
					e.printStackTrace();
					discover = null;
					LOGGER.error("start discover server unknoe host", e);
					started = false;

					// maybe throws new exception.
				}
			}

		});
		t.start();
		LOGGER.info("start discover server  for device success!!!!");
	}

	/**
	 * 关
	 */
	public void off() {
		if (discover != null) {
			discover.stop();
		}
		started = false;
	}

	/**
	 * 是否开启
	 * 
	 * @return started
	 */
	public boolean isStarted() {
		return started;
	}

	static class UdpDiscoverFactory {
		/**
		 * 获取多播发现者
		 * 
		 * @return Receiver
		 */
		public static Receiver getMultiUdpDiscover() {
			return new MultiReceiver();
		}

		/**
		 * 获取单播发现者
		 * 
		 * @return Receiver
		 */
		public static Receiver getUdpDiscover() {
			return new UniReceiver();
		}
	}

}




接受者
接口
package com.gbcom.ccsv3.transport.multidiscover;

import java.net.UnknownHostException;

/**
 * UDP 发现
 * 
 * <p>
 * 
 * @author syz
 *         <p>
 * @date 2015-6-26,下午04:39:06
 *       <p>
 * @version v1.0.0
 *          <p>
 * @see com.gbcom.ccsv3.transport.multidiscover.Receiver
 */
public interface Receiver {

	/**
	 * 开始
	 * 
	 * @throws UnknownHostException
	 *             Exception
	 */
	public void start() throws UnknownHostException;

	/**
	 * 停止
	 */
	public void stop();

	/**
	 * 是否开启
	 * 
	 * @return Boolean
	 */
	public boolean isStarted();

}


组播的实现

package com.gbcom.ccsv3.transport.multidiscover;

import java.net.DatagramPacket;
import java.net.InetAddress;
import java.net.MulticastSocket;
import java.net.UnknownHostException;
import java.util.Date;

import org.apache.log4j.Logger;

/**
 * 多播发现者
 * 
 * * 组播 通信端口 1107 单播 通信端口1108
 * 
 * <p>
 * 
 * @author syz
 *         <p>
 * @date 2015-6-26,下午04:25:33
 *       <p>
 * @version v1.0.0
 *          <p>
 * @see com.gbcom.ccsv3.transport.multidiscover.MultiReceiver
 */
public class MultiReceiver implements Receiver {
	private static final Logger LOGGER = Logger.getLogger(MultiReceiver.class);

	/**
	 * 多播ip
	 */
	public static final String MULTI_GROUP_IP = "224.7.11.3";
	/**
	 * 多播端口
	 */
	public static final int MULTI_GROUP_PORT = 1107;
	private MulticastSocket msr = null;
	private InetAddress group = null;

	private boolean started = false;

	/**
	 * 开始
	 * 
	 * @throws UnknownHostException
	 *             Exception
	 */
	@Override
	public void start() throws UnknownHostException {
		// 创建多播socket
		// 接收报文
		this.group = InetAddress.getByName(MULTI_GROUP_IP);// 组播地址

		try {
			msr = new MulticastSocket(MULTI_GROUP_PORT); // server bind port
			//java.net.SocketException: No such device
//	        at java.net.PlainDatagramSocketImpl.join(Native Method)
//	        at java.net.PlainDatagramSocketImpl.join(PlainDatagramSocketImpl.java:181)
//	        at java.net.MulticastSocket.joinGroup(MulticastSocket.java:277)
//	        at com.gbcom.ccsv3.transport.multidiscover.MultiReceiver.start(MultiReceiver.java:56)
//	        at com.gbcom.ccsv3.transport.multidiscover.DiscoverServer$1.run(DiscoverServer.java:50)
//	        at java.lang.Thread.run(Thread.java:662)
			msr.joinGroup(group);// 加入连接
			byte[] buffer = new byte[50];
			LOGGER.info("Thread=" + Thread.currentThread()
					+ " ; MultiReceiver started!!! (启动时间: " + new Date() + ")");
			started = true;
			while (true) {
				try {
					// 建立一个指定缓冲区大小的数据包
					DatagramPacket dp = new DatagramPacket(buffer,
							buffer.length);
					msr.receive(dp);
					DpDispatcher.getInstance().addDp(dp);
				} catch (Exception e) {
					LOGGER.error("receiver is error , continue", e);
					continue;
				}
			}
		} catch (Exception e) {
			e.printStackTrace();
			LOGGER.error("MultiDiscover --- start -- error", e);
		} finally {
			if (msr != null) {
				try {
					msr.leaveGroup(group);
					msr.close();
				} catch (Exception e) {
					LOGGER.error("MultiDiscover --- start finall -- error", e);
				}
			}
		}

	}

	/**
	 * 停止
	 */
	@Override
	public void stop() {
		if (msr != null) {
			try {
				msr.leaveGroup(group);
				msr.close();
			} catch (Exception e) {
				LOGGER.error("MultiDiscover --- start finall -- error", e);
			}
		}
		started = false;

	}

	/**
	 * 是否开启
	 * 
	 * @return started
	 */
	@Override
	public boolean isStarted() {
		return started;
	}

}



发送者
package com.gbcom.ccsv3.transport.multidiscover;

import java.net.InetAddress;
import java.net.UnknownHostException;

/**
 * UDP 发现
 * 
 * <p>
 * 
 * @author syz
 *         <p>
 * @date 2015-6-26,下午04:39:06
 *       <p>
 * @version v1.0.0
 *          <p>
 * @see Sender
 */
public interface Sender {

	/**
	 * 发送消息。 添加ip 和port 兼容单播处理,作为单播的 端口和地址,
	 * 
	 * @param msg
	 *            String
	 * @param ip
	 *            InetAddress
	 * @param port
	 *            int
	 * @throws UnknownHostException
	 *             Exception
	 */
	public void send(String msg, InetAddress ip, int port)
			throws UnknownHostException;

}


组播实现
package com.gbcom.ccsv3.transport.multidiscover;

import java.net.DatagramPacket;
import java.net.InetAddress;
import java.net.MulticastSocket;
import java.net.UnknownHostException;
import org.apache.log4j.Logger;

/**
 * 多播发送者,单利 和 静态类 * 组播 通信端口 1107 单播 通信端口1108
 * 
 * <p>
 * 
 * @author syz
 *         <p>
 * @date 2015-6-26,下午04:25:33
 *       <p>
 * @version v1.0.0
 *          <p>
 * @see MultiSender
 */
public final class MultiSender implements Sender {
	private static final Logger LOGGER = Logger.getLogger(MultiSender.class);

	/**
	 * 多播ip
	 */
	public static final String MULTI_GROUP_IP = "224.7.11.3";
	/**
	 *多播端口
	 */
	public static final int MULTI_GROUP_PORT = 1108;
	private static final MultiSender INSTANCE = new MultiSender();

	/**
	 * 单例模式,获取单例
	 * 
	 * @return MultiSender
	 */
	public static MultiSender getInstance() {
		return INSTANCE;
	}

	private MultiSender() {

	}

	/**
	 * @param msg
	 *            String
	 * @param ip
	 *            InetAddress
	 * @param port
	 *            int
	 * @throws UnknownHostException
	 *             Exception
	 */
	@Override
	public void send(String msg, InetAddress ip, int port)
			throws UnknownHostException {

		InetAddress group = InetAddress.getByName(MULTI_GROUP_IP);// 组播地址
		MulticastSocket mss = null;
		try {
			// mss = new MulticastSocket(MULTI_GROUP_PORT);
			mss = new MulticastSocket(); // 随机端口 client
			mss.joinGroup(group);

			byte[] buffer = msg.getBytes();
			DatagramPacket dp = new DatagramPacket(buffer, buffer.length,
					group, MULTI_GROUP_PORT);
			mss.send(dp);
		} catch (Exception e) {
			e.printStackTrace();
			LOGGER.error("MultiSender -- send", e);
		} finally {
			try {
				if (mss != null) {
					mss.leaveGroup(group);
					mss.close();
				}
			} catch (Exception e) {
				LOGGER.error("MultiSender -- send -- final", e);
			}
		}

	}

}




处理器实现

package com.gbcom.ccsv3.transport.multidiscover;

import java.net.DatagramPacket;
import java.net.InetAddress;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.log4j.Logger;
import com.gbcom.omc.si.common.Const;

/**
 * 转发器,接收dp
 * 
 * <p>
 * 
 * @author syz
 *         <p>
 * @date 2015-6-26,下午04:29:42
 *       <p>
 * @version v1.0.0
 *          <p>
 * @see com.gbcom.ccsv3.transport.multidiscover.DpDispatcher
 */
public class DpDispatcher {
	private static final Logger LOG = Logger.getLogger(DpDispatcher.class);
	private static final int THREAD_NUM = 1;
	private static final int BLOCK_QUEUE_MAX_SIZE = 10000;
	private static final int BLOCK_QUEUE_CLEAR_SIZE = 10000;

	/**
	 * 线程的执行器
	 */
	private ExecutorService executor = null;

	private boolean isRunning = false;
	/**
	 * 上报Trap消息的队列 :SIZE
	 */
	private BlockingQueue<DatagramPacket> dpQueue = new LinkedBlockingQueue<DatagramPacket>(
			BLOCK_QUEUE_MAX_SIZE);

	private static class DpDispatcherHolder {
		private static final DpDispatcher INSTANCE = new DpDispatcher();
	}

	/**
	 * 获取单例对象
	 * 
	 * @return TaskDispatcher
	 */
	public static DpDispatcher getInstance() {
		return DpDispatcherHolder.INSTANCE;
	}

	private DpDispatcher() {
		init();
		start();
	}

	private void init() {
		isRunning = false;
	}

	/**
	 * 添加数据包
	 * 
	 * @param dp
	 *            DatagramPacket
	 */
	public void addDp(DatagramPacket dp) {

		if (!isRunning) {
			LOG
					.error("UdpDispatcher  is not running, the Task below may not process");
		}
		if (LOG.isDebugEnabled()) {
			LOG.debug("add DatagramPacket to Queue,, Address="
					+ dp.getAddress());
		}
		try {
			if (dpQueue.size() >= BLOCK_QUEUE_CLEAR_SIZE) {
				LOG
						.info(" *****cleart request Task***** trap queue size is more than "
								+ BLOCK_QUEUE_CLEAR_SIZE
								+ ";;  CLEAR BlockingQueue");
				dpQueue.clear();
			}
			dpQueue.put(dp);
		} catch (InterruptedException e) {
			LOG.info("/******* add dp InterruptedException*********/");
			LOG.error("add dp to queue interrupted", e);
			LOG.info("/******* add dp InterruptedException  *********/");
		} catch (Exception e) {
			LOG.error("Other Exception  ", e);
		}

	}

	/**
	 * 停止
	 */
	public void stop() {
		executor.shutdownNow();
		isRunning = false;
	}

	/**
	 * 开始
	 */
	public void start() {

		executor = Executors.newCachedThreadPool();
		for (int i = 0; i < THREAD_NUM; i++) {
			executor.execute(new DispatcherTask());
		}
		isRunning = true;
		LOG.info("do Dispatcher task start  , current thread size =  "
				+ THREAD_NUM);

	}

	class DispatcherTask implements Runnable {

		/**
		 * 线程执行方法
		 */
		@Override
		public void run() {

			DatagramPacket dp = null;
			while (!Thread.currentThread().isInterrupted()) {
				try {
					long begin = System.currentTimeMillis();
					dp = dpQueue.take();

					String s = new String(dp.getData(), 0, dp.getLength());
					LOG.info("discover receiver dp , msg=" + s
							+ ",dpQueue size=" + dpQueue.size());
					if (s.equalsIgnoreCase("who")) {
						/*
						 * TransportMapping mapping
						 * =SnmpSingleton.getTransportMapping(); if(mapping
						 * instanceof DefaultUdpTransportMapping){ String ip =
						 * ((DefaultUdpTransportMapping)mapping).getAddress().
						 * getInetAddress().toString();
						 * SenderFactory.getMultiSender().send(ip); }
						 */

						String ip = "NULL";
						int port = 162;
						if (Const.sourceSnmpIp == null
								|| Const.sourceSnmpIp.trim().equals("")) {
							ip = InetAddress.getLocalHost().getHostAddress()
									.toString();
						} else {
							String[] udpSrc = (Const.sourceSnmpIp.trim())
									.split("/");
							if (udpSrc.length < 1 || udpSrc.length > 2) {
								ip = InetAddress.getLocalHost()
										.getHostAddress().toString();
							} else {
								ip = udpSrc[0];
								port = (udpSrc.length == 2) ? Integer
										.parseInt(udpSrc[1]) : 162;
							}
						}
						String msg = "IP:" + ip + "," + "PORT:" + port;
						// InetAddress addr =
						// InetAddress.getByName(MultiSender.MULTI_GROUP_IP);
						// SenderFactory.getMultiSender().send(msg,MultiSender.MULTI_GROUP_IP,MultiSender.MULTI_GROUP_PORT);

						SenderFactory.getUniSender().send(msg, dp.getAddress(),
								dp.getPort());
					} else {
						// LOG.error("OTHER INFOR---"+s);
					}

					if (LOG.isDebugEnabled()) {

						LOG.info("process Task  success, thread="
								+ Thread.currentThread().getName()
								+ "  ;spend time :total= "
								+ ((System.currentTimeMillis() - begin) / 1000)
								+ "s  || the queue size is not actually:"
								+ dpQueue.size());
					}
				} catch (InterruptedException e) {
					LOG
							.info("/******* DP Dispatcher  InterruptedException*********/");
					LOG.error("DP Dispatcher thread interrupted ;; tread = "
							+ Thread.currentThread().getName(), e);
					LOG
							.info("/******* DP Dispatcher  InterruptedException*********/");
					Thread.currentThread().interrupt();
					break;
				} catch (Exception e) {
					LOG.error("DP Dispatcher thread exception", e);
					continue;
				}
			}

		}
	}

	public static class SenderFactory {
		/**
		 * 获取多播发送者
		 * 
		 * @return Sender
		 */
		public static Sender getMultiSender() {
			return MultiSender.getInstance();
		}

		/**
		 * 获取单播发送者
		 * 
		 * @return UniSender
		 */
		public static Sender getUniSender() {
			return UniSender.getInstance();
		}

	}

}



使用 打开发现服务器,,扩展处理器模块即可。




示例代码仅支持udp,完整代码参考附件。
分享到:
评论

相关推荐

    rtp服务器端的java实现

    以上就是关于"RTP服务器端的Java实现"的核心知识点。通过学习和实践这些内容,可以开发出能够稳定运行、支持视频播放的RTP服务器。在实际项目中,可能还需要根据具体需求进行扩展,比如支持组播、加密传输等高级特性...

    java开发的组播聊天室

    本项目名为“java开发的组播聊天室”,其主要特点在于利用了IP组播协议,使得信息能够高效地广播到多个接收者,从而实现多人同时聊天的功能,同时也支持私聊模式。 组播是一种网络通信方式,它允许发送者一次性将...

    java udp 组播

    下面将详细阐述如何使用Java实现UDP组播。 首先,我们要了解UDP(User Datagram Protocol)是传输层的一种无连接协议,它比TCP(Transmission Control Protocol)更快,但不保证数据的顺序和可靠性。组播则是基于...

    Java聊天室,组播+私聊

    Java聊天室是一个基于Java编程语言实现的交互式通信平台,它结合了组播和私聊功能,使得用户既能参与群组讨论,也能进行一对一的秘密对话。在这个系统中,组播技术用于实现多人同时在线聊天的功能,而私聊则确保了...

    安卓网站交互JSONxmlWebserviceUPnP相关-AndroidUDP组播的例子包含Android组播Server和Client端发送端和接收端.zip

    中的"JavaApk源码说明.txt"可能包含了关于源码的详细解释和使用指南,"下载更多打包源码~.url"可能是一个链接,指向更多相关的Android开发资源,"UDP_Multicast_Client"和"UDP_Multicast_Server"则是实际的Android...

    组播服务端及客户端代码实现

    在本项目中,“组播服务端及客户端代码实现”是一个涉及组播协议的实际编程实践,主要关注如何利用编程语言构建能够发送和接收组播报文的服务器端和客户端。 服务器端在组播中扮演着数据源的角色,它会向特定的组播...

    javacv实现同屏浏览

    javacv推送桌面到rtmp服务器,拉取rtmp流并播放,实现同屏浏览 采用的javacv版本javacv-platform-1.5-bin,将javacv-platform-1.5-bin的所有jar都拷贝到了工程的lib目录,并引用了javacpp.jar, javacv-platform.jar...

    用JAVA实现P2P网络通信

    "用JAVA实现P2P网络通信" 本文将详细介绍如何使用JAVA实现P2P网络通信,分析P2P基本概念及其基本工作原理,并探讨了用JAVA实现p2p...用JAVA实现P2P网络通信需要了解P2P基本概念、多播技术、TCP和UDP协议等相关技术。

    java+FFmpeg+JavaCV实现无控件HTML页面视频实时预览,录像等,RTSP协议

    综上所述,这个项目融合了多个高级IT技术,展示了如何在Java环境中集成FFmpeg和JavaCV来实现复杂的实时视频处理功能,同时利用RTSP协议和WebSocket提供无控件的HTML5视频预览和录像服务。这样的技术组合在物联网、...

    javaUDP组播群聊小程序

    Java UDP组播群聊小程序是一种基于用户数据报协议(UDP)实现的多点通信应用,适合初学者了解网络编程和组播技术。UDP是一种无连接的传输层协议,相较于TCP,它提供了更低的延迟和更高的效率,但不保证数据的可靠...

    基于Java的多网卡计算机IP 组播通信的设计和实现.pdf

    基于 Java 的多网卡计算机 IP 组播通信的设计和实现 本文介绍了基于 Java 的多网卡计算机 IP 组播通信的设计和实现。IP 组播技术是网络中的一种高效数据传输方式,可以实现点到多点的高效数据传输。在多网卡设备上...

    组播技术的基本实现

    下面我们将深入探讨组播技术的基本实现。 首先,我们需要理解IP组播的基础概念。IP组播是基于IP协议的一种通信方式,它使用特定的IP地址范围(224.0.0.0至239.255.255.255)来标识组播组。这些IP地址不对应任何特定...

    java_udp.rar_java udp_组播

    总结起来,Java UDP组播是构建P2P应用程序的关键技术,它利用了无连接的特性以及组播功能,实现了数据高效、多点的分发。这个实例为你提供了一个实践的起点,通过学习和运行这个代码,你可以更好地掌握Java中的UDP组...

    java的Socket实现的多人聊天程序

    Java的Socket实现的多人聊天程序是一个基于网络通信的项目,主要利用了Java的Socket类来构建客户端和服务端的通信桥梁。Socket是TCP/IP协议的一部分,它提供了两台计算机之间进行数据传输的基础。在这个项目中,...

    Java利用UDP协议实现多广播组通信源码

    本篇将深入探讨如何利用Java的UDP(User Datagram Protocol)协议实现多播广播组通信,并结合GUI(图形用户界面)进行可视化操作。 首先,UDP是一种无连接的、不可靠的传输层协议,它比TCP(Transmission Control ...

    组播组中发送和接受数据.rar

    在Java编程语言中,组播通信是一种网络通信技术,它允许单个数据源向多个...例如,在实时视频流应用中,服务器只需要向组播组发送一次数据,所有组内的客户端都可以接收到,大大减少了服务器的负担和网络带宽的消耗。

    [java语言]组播技术和JGroups.pdf

    示例代码中展示了一个简单的组播服务器(MulticastServer)和客户端(MulticastClient)的实现。组播服务器创建了一个MulticastSocket,设置了网络接口,并加入了组播组。它通过监听端口7777来接收发往组播地址***.*...

    UDP.rar_Java 组播_UDP_UDP语音

    通过组播,服务器只需要发送一次语音数据,所有参与者都能收到,减少了服务器的负载和网络带宽的使用。对于文字交流,虽然不如语音那么实时,但依然可以选择 UDP,因为它可以提供比TCP更快的响应速度,尤其是在用户...

Global site tag (gtag.js) - Google Analytics