UDP发送接收数据类:
/** * udp 数据报的发送和接收 * @author yanlei * */ public class UDPPort implements Runnable{ private static Logger logger = Logger.getLogger(UDPPort.class); /** * 本地端口,未指定,则是随机端口 */ int localPort = 0; /** * 接收缓冲区大小 */ int bufferSize= 1024; public int getLocalPort() { return localPort; } public void setLocalPort(int localPort) { this.localPort = localPort; } public int getBufferSize() { return bufferSize; } public void setBufferSize(int bufferSize) { this.bufferSize = bufferSize; } ExecutorService executorService = null; DatagramChannel datagramChannel = null; Selector selector = null; ByteBuffer byteBuffer = null; public boolean open() throws IOException{ datagramChannel = DatagramChannel.open();//打开通道 if(this.localPort>0){ datagramChannel.socket().bind(new InetSocketAddress(this.localPort));//绑定本地端口 } byteBuffer = ByteBuffer.allocate(this.bufferSize); if(this.updHandler != null){ datagramChannel.configureBlocking(false);//配置成非阻塞模式 selector = Selector.open();//打开选择器 datagramChannel.register(selector, SelectionKey.OP_READ);//注册监听可读取事件 executorService = Executors.newCachedThreadPool(); executorService.execute(this); } return true; } UDPHandler updHandler; public void registerUdpHanndler(UDPHandler updHandler ){ this.updHandler = updHandler; } Queue<SendData> sendDataQueue = new ConcurrentLinkedQueue<SendData>(); @Override public void run() { // TODO Auto-generated method stub try{ while(true){ selector.select();//阻塞直至收到数据包,返回值可能为零,但有事件发生,因此不以返回值判断事件数 if(this.stop){ break; } Set<SelectionKey> keys = selector.selectedKeys();//获取发生读取事件的注册键 Iterator<SelectionKey> iterator = keys.iterator(); while(iterator.hasNext()){//遍历 SelectionKey key=iterator.next(); iterator.remove();//需要手工移除注册键,否则下次selectedKeys里来包括它(虽然该selectionKey对应的通道上没有事件) // DatagramChannel dc = (DatagramChannel)key.channel();//获取接收数据通道==datagramChannel if(key.isWritable()){//通道可以写入了 if(!sendDataQueue.isEmpty()){//重发队列不为空 SendData sendData = null; while((sendData = sendDataQueue.peek()) != null){ if(!this.send(sendData.getTarget(), sendData.getByteBuffer())){//重发失败 break; }else{ sendDataQueue.poll();//重发成功 } } }else{//重发队列空了,不再监控可写入事件,只监控接收数据事件 key.interestOps(SelectionKey.OP_READ); } }else if(key.isReadable()){//接收数据事件,有数据被接收了 byteBuffer.clear(); final SocketAddress from = this.receive(byteBuffer);//接收数据包,返回数据来源 if(from != null){ byteBuffer.flip(); byte [] data = new byte [byteBuffer.remaining()];//实际数据大小的数组 byteBuffer.get(data); final ByteBuffer receiveBuffer = ByteBuffer.wrap(data); ReceiveData receiveData = new ReceiveData(); receiveData.setFrom(from); receiveData.setByteBuffer(receiveBuffer); ReceiveHandler receiveHandler = new ReceiveHandler(receiveData);//处理类逻辑线程 this.executorService.execute(receiveHandler); } } } } }catch(IOException e){ this.ioException = e; logger.error("",e); close(); } } IOException ioException= null; private boolean isOccurException (){ return ioException !=null; } private List<SendData> getUnSendDataList(){ List<SendData> sendDataList = new ArrayList<SendData>(); while(!this.sendDataQueue.isEmpty()){ sendDataList.add(this.sendDataQueue.poll()); } return sendDataList; } private volatile boolean stop; private volatile boolean closed; public synchronized void close() { if(closed ){ return ; } this.stop = true; this.selector.wakeup();//不接收数据 try{ if(this.isOccurException()){//发生IO异常 List<ReceiveData> receiveDataList = new ArrayList<ReceiveData>(); List<Runnable> runnableList = executorService.shutdownNow();//停止线程池,获取等待队列中的任务 if(runnableList != null && runnableList.size()>0){ for(Runnable runnable:runnableList){ ReceiveHandler receiveHandler = (ReceiveHandler)runnable; ReceiveData receiveData = receiveHandler.getReceiveData(); receiveDataList.add(receiveData); } } try{ this.updHandler.onError(ioException, receiveDataList, getUnSendDataList()); }catch(Exception e){ logger.error("",e); } }else{//正常被调用关闭 try{ this.updHandler.onClose(getUnSendDataList());//未发送出去的数据包 }catch(Exception e){ logger.error("",e); } } }catch(Exception e){ logger.error("",e); }finally{ try{ executorService.shutdown(); while(!executorService.awaitTermination(1, TimeUnit.SECONDS)){}//阻塞直至所有任务都完成,线程退出,关闭连接 this.selector.close(); this.datagramChannel.close(); }catch(Exception e){} } closed = true; } private boolean send(SocketAddress target,ByteBuffer src) throws IOException{ int length = src.remaining(); int sendNum = this.datagramChannel.send(src, target); return sendNum==length; } public boolean send(SocketAddress target,ByteBuffer ... srcs) { try{ if(closed ){ return false; } int length=0; ByteBuffer allByteBuffer = null; if(srcs.length>1){ for(ByteBuffer buffer:srcs){ length += buffer.remaining(); } allByteBuffer = ByteBuffer.allocate(length); for(ByteBuffer buffer:srcs){ allByteBuffer.put(buffer); } allByteBuffer.flip(); }else{ allByteBuffer = srcs[0]; length = allByteBuffer.remaining(); } int sendNum = this.datagramChannel.send(allByteBuffer, target); //返回值 sendNum==length ,说明数据全被发送出去了,返回值sendNum=0,说明底层输出缓冲区中没有足够的空间供数据报,则未发送任何字节数据 //与socketChannel 不同,socketChannel在底层输出缓冲区中没有足够的空间时,能发送几个字节就发几个字节。返回值为发送字节个数。 if(sendNum ==0){ SendData sendData = new SendData(); sendData.setByteBuffer(allByteBuffer); sendData.setTarget(target); this.sendDataQueue.add(sendData); SelectionKey key = datagramChannel.keyFor(this.selector); key.interestOps(SelectionKey.OP_READ |SelectionKey.OP_WRITE); this.selector.wakeup(); } return true; }catch(IOException e){ this.ioException = e; logger.error("",e); close(); return false; } } public SocketAddress receive(ByteBuffer dest) throws IOException{ SocketAddress sa = this.datagramChannel.receive(dest);//如果数据包的数据大于缓冲区大小,则大出部分被丢弃,因此需要根据实际情况配置缓冲区大小 return sa; } class ReceiveHandler implements Runnable{ ReceiveData receiveData = null; public ReceiveHandler(ReceiveData receiveData){ this.receiveData = receiveData; } public ReceiveData getReceiveData() { return receiveData; } @Override public void run() { // TODO Auto-generated method stub updHandler.onReceive(receiveData.getFrom(), receiveData.getByteBuffer(), UDPPort.this); } } }
接收数据报的接口,需子类实现具体的逻辑
/** * * 用于处理UDP数据报的接口 * 由子类实现业务逻辑,并在UDPPort类中注册 * 当接收到数据报时,就触发onReceive方法 * @author yanlei * */ public interface UDPHandler { public void onReceive(SocketAddress from,ByteBuffer receiveBuffer,UDPPort udpPort); public void onError(IOException e,List<ReceiveData> receiveDataList,List<SendData> unSendDataList); public void onClose(List<SendData> unSendDataList); }
Server 端:
/** * 聊天室服务端 * @author yanlei * */ public class ChatServer extends DefaultUDPHandler{ private Map <SocketAddress,byte [] > userMap= new ConcurrentHashMap<SocketAddress,byte []>();//地址与用户名对应关系 public void addUser(SocketAddress sa,byte [] nameByteBuffer){ userMap.put(sa, nameByteBuffer); } Charset charset =Charset.forName("GBK"); public byte [] getNameBytes(SocketAddress sa){ return userMap.get(sa); } public byte [] removeUser(SocketAddress sa){ return userMap.remove(sa); } @Override public void onReceive(SocketAddress from, ByteBuffer messageByteBuffer, UDPPort udpPort) { // TODO Auto-generated method stub byte command = messageByteBuffer.get(); ByteBuffer messageToAllBuffer = null; switch(command){ case 1: //login messageToAllBuffer = welcomeUser(messageByteBuffer,from); break; case 2: //exit; messageToAllBuffer = userExit(from); break; case 3: //talk; messageToAllBuffer = userTalk(messageByteBuffer,from); break; } if(messageToAllBuffer != null){ talkToAll(messageToAllBuffer,udpPort);//talk to all } } byte [] talkCommand = new byte []{3}; public ByteBuffer welcomeUser(ByteBuffer messageByteBuffer,SocketAddress socketAddress) { byte []name = new byte [messageByteBuffer.remaining()]; messageByteBuffer.get(name); addUser(socketAddress,name); ByteBuffer loginBytes = unionAllBytes(talkCommand," welcome ".getBytes(),name," to login in".getBytes()); System.out.println("user <"+new String(name,charset)+"> login"); return loginBytes; } public ByteBuffer userExit(SocketAddress socketAddress ) { byte []name = removeUser(socketAddress); ByteBuffer exitBytes = unionAllBytes(talkCommand,name,"to exit!".getBytes()); System.out.println("user <"+new String(name,charset)+"> to exit"); return exitBytes; } public ByteBuffer userTalk(ByteBuffer messageByteBuffer,SocketAddress socketAddress){ byte []name = getNameBytes(socketAddress); ByteBuffer taskToAllBytes = unionAllBytes(talkCommand,name,":".getBytes(),messageByteBuffer); System.out.println("user <"+new String(name,charset)+"> talk:"+new String(Arrays.copyOfRange(messageByteBuffer.array(), 1, messageByteBuffer.limit()),charset)); return taskToAllBytes; } List<SendData> sendDataList = new ArrayList<SendData>(); public void talkToAll(ByteBuffer messageByteBuffer ,UDPPort udpPort) { Iterator<SocketAddress> iterator = userMap.keySet().iterator(); while(iterator.hasNext()){ SocketAddress sa = iterator.next(); if(!udpPort.send(sa, messageByteBuffer)){//发生异常 while(iterator.hasNext()){ SendData sendData = new SendData(); sendData.setTarget(sa); sendData.setByteBuffer(messageByteBuffer); sendDataList.add(sendData); } break; } messageByteBuffer.flip(); } } private ByteBuffer unionAllBytes(Object ... objs){ int length = 0; for(Object obj :objs){ if(obj.getClass().isArray()){ byte [] bytes = (byte [])obj; length +=bytes.length; }else if(ByteBuffer.class.isInstance(obj)){ ByteBuffer byteBuffer = (ByteBuffer)obj; length +=byteBuffer.remaining(); } } ByteBuffer byteBuffer = ByteBuffer.allocate(length); for(Object obj :objs){ if(obj.getClass().isArray()){ byteBuffer.put((byte [])obj); }else if(ByteBuffer.class.isInstance(obj)){ byteBuffer.put((ByteBuffer)obj); } } byteBuffer.flip(); return byteBuffer; } public static void main(String args []){ try{ System.out.print("please input server port:"); Scanner scanner = new Scanner(System.in); String serverPort = scanner.nextLine(); ChatServer server = new ChatServer(); UDPPort udpPort = new UDPPort(); udpPort.setLocalPort(Integer.parseInt(serverPort)); udpPort.registerUdpHanndler(server); udpPort.open(); System.out.println("server monitor on port:"+serverPort); }catch(Exception e){ e.printStackTrace(); } } }
聊天室客户端:
/** * 聊天室客户端 * @author yanlei * */ public class ChatClient extends DefaultUDPHandler implements Runnable{ SocketAddress serverAddress = null; PrintWriter pw = null; Charset charset =Charset.forName("GBK"); UDPPort udpPort = null; String name=null; public ChatClient (String serverIP,String serverPort,UDPPort udpPort,String name) throws Exception{ InetAddress serverIpAddress = InetAddress.getByAddress(ipToByteArray(serverIP)); serverAddress = new InetSocketAddress(serverIpAddress,Integer.parseInt(serverPort)); pw = new PrintWriter(new OutputStreamWriter(System.out)); this.udpPort = udpPort; this.name = name; } @Override public void onReceive(SocketAddress from, ByteBuffer receiveBuffer, UDPPort udpPort) { // TODO Auto-generated method stub byte command = receiveBuffer.get(); String message = charset.decode(receiveBuffer).toString(); printToConsole(message); } public void printToConsole(String str){ if(pw != null){ pw.println(); pw.println("==> "+str); pw.print("<input>:"); pw.flush(); } } private ByteBuffer loginCommand = ByteBuffer.wrap(new byte []{1}); private ByteBuffer exitCommand = ByteBuffer.wrap(new byte []{2}); private ByteBuffer talkCommand = ByteBuffer.wrap(new byte []{3}); private ByteBuffer getLoginCommand(){ loginCommand.clear(); return loginCommand; } private ByteBuffer getExitCommand(){ exitCommand.clear(); return exitCommand; } private ByteBuffer getTalkCommand(){ talkCommand.clear(); return talkCommand; } public boolean toTalk(String message) { return udpPort.send(serverAddress,getTalkCommand(),ByteBuffer.wrap(message.getBytes()));//login } public void login(){ udpPort.send(serverAddress,getLoginCommand(),ByteBuffer.wrap(this.name.getBytes()));//login } public void toExit() throws IOException{ udpPort.send(serverAddress,getExitCommand()); } volatile boolean exceptionFlag = false; public void setOccurException() { exceptionFlag = true; } public boolean isOccurException(){ return exceptionFlag; } @Override public void run() { // TODO Auto-generated method stub try{ login(); Scanner scanner = new Scanner(System.in); System.out.println(); System.out.print("input:"); while(true){ String line = scanner.nextLine(); if(isOccurException()){//异常退出 break; } if(line != null){ if(line.trim().equals("exit")){ toExit(); break; }else{ if(!toTalk(line)){ System.out.println(" occur IOException to exit input thread"); break; } } } } }catch(IOException e){ e.printStackTrace(); }finally{ udpPort.close(); } } @Override public void onError(IOException e, List<ReceiveData> receiveDataList, List<SendData> unSendDataList) { // TODO Auto-generated method stub this.setOccurException(); } private static byte [] ipToByteArray(String ip){ String [] temp = ip.split("\\."); byte [] ips = new byte [4]; for(int i=0;i<ips.length;i++){ ips[i]= (byte)Integer.parseInt(temp[i]); } return ips; } public static void main(String [] args){ try{ System.out.print("please input server ip:"); Scanner scanner = new Scanner(System.in); String serverIp = scanner.nextLine(); System.out.print("please input server port:"); String serverPort = scanner.nextLine(); System.out.print("please input your name:"); String name = scanner.nextLine(); UDPPort udpPort =new UDPPort(); ChatClient chatClient = new ChatClient(serverIp,serverPort,udpPort,name); udpPort.registerUdpHanndler(chatClient); udpPort.open(); ExecutorService executorService =Executors.newFixedThreadPool(1); executorService.execute(chatClient); executorService.shutdown(); while(!executorService.awaitTermination(2, TimeUnit.SECONDS)){}//阻塞直至所有任务都完成,线程退出,关闭连接 }catch(Exception e){ e.printStackTrace(); } } }
执行截图:
相关推荐
在这个"UDP简单聊天程序示例"中,我们将探讨如何利用C#中的`UdpClient`类来实现一个简单的UDP聊天应用。 首先,我们要理解`UdpClient`类在.NET Framework中的作用。`UdpClient`是System.Net.Sockets命名空间下的一...
在基于UDP的简单聊天系统中,我们将重点讨论如何利用UDP协议进行数据通信,以及在VC++环境中实现这样的系统。 首先,理解UDP的特点至关重要。与TCP(Transmission Control Protocol)不同,UDP不建立连接,没有握手...
现在,让我们看一下如何实现一个简单的UDP聊天程序: 1. **创建服务器**:服务器首先绑定到一个端口,然后使用`UdpClient.ReceiveAsync()`持续监听。接收到数据后,解码消息并广播给所有已连接的客户端。 2. **...
本项目"**C# UDP 简单的聊天程序**"正是基于C#编程语言实现的一个UDP通信基础示例,它为我们提供了理解如何在C#中操作UDP套接字的基本方法。 首先,让我们了解一下UDP的特点。UDP不保证数据包的顺序、完整性和可靠...
5. **安全性**:虽然UDP聊天程序通常是简单的示例,但在实际应用中,需要考虑网络安全问题,如数据加密、防止中间人攻击等。 6. **可扩展性**:如果希望实现多人聊天,可能需要设计一个服务器来转发消息,或者使用...
在这个"Socket聊天程序示例"中,我们探讨的是如何构建一个简单的网络聊天环境,其中每个终端都可以同时作为服务器和客户端进行交互。 首先,我们要了解Socket的基本概念。在TCP/IP协议栈中,Socket是应用层与传输层...
基于UDP的聊天室编程2(客户端)--由于该程序内含两个小程序,受上传权限20M的影响,只能将两个小程序分开,此为该聊天室程序的客户端,请继续下载基于UDP的聊天室编程1(服务器端),下载完后请放在同一个文件夹中...
总之,这个项目提供了一个基础的UDP聊天程序示例,有助于理解C++中的网络编程概念,以及UDP协议在实际应用中的工作原理。开发者可以通过分析和修改这个程序,学习如何构建更复杂、功能更丰富的网络应用。
本项目"**C# TCP UDP 多线程简易聊天程序源码**"提供了一个基础的学习平台,让我们能够深入理解这两种协议在实际应用中的工作原理,并通过C#编程语言来实现它们。 TCP是一种面向连接的、可靠的协议,它保证了数据的...
UDP不提供连接建立、数据确认、拥塞控制等机制,而是以尽可能快的速度发送数据,适合实时性要求高的应用,如在线视频、音频通话、在线游戏和简单聊天程序等。 本资源“UDP聊天程序.rar”是一个使用UDP协议实现的...
本文将深入探讨如何使用C++编程语言来实现基于TCP和UDP的简单Socket服务,包括echo(回显)、time(时间查询)等功能。 首先,TCP是一种面向连接的、可靠的协议,它保证了数据的顺序和完整性。TCPecho服务通常用于...
UDP 的 Java 聊天程序 ...在本文中,我们将详细介绍如何使用 Java 实现基于 UDP 的聊天程序,并提供了一个简单的示例代码,演示如何使用 DatagramSocket、DatagramPacket 和 InetAddress 三个类实现基本的聊天功能。
总结来说,这个项目是一个实践性质的Java UDP网络编程示例,可以帮助初学者了解如何利用UDP协议进行简单的即时通讯应用开发,同时提供了多线程和网络编程的实践经验。虽然功能可能不完整,但作为学习和研究的基础是...
在这个基于UDP的Java聊天程序设计中,开发者需要实现一个能够相互发送聊天信息的简单聊天工具。 首先,程序的设计分为以下几个主要步骤: 1. **问题重述**:开发一个使用C、C++或Java语言的UDP聊天程序,要求用户...
与TCP(传输控制协议)相比,UDP更适合实时性要求高的应用,例如在线游戏、视频会议或如本示例中的简单聊天室。 MFC(Microsoft Foundation Classes)是微软提供的一套面向对象的C++库,用于开发Windows应用程序。...
标签“udp_vc”、“vc_udp”和“网络聊天”进一步强调了这个程序的核心特性——使用UDP协议进行网络通信,而“vc_聊天程序”则表明它是用VC++实现的聊天应用程序。 在压缩包的文件名称“网络对聊小例程”中,我们...
本篇将详细介绍如何使用C#实现基于UDP协议的简单聊天程序。 首先,理解UDP协议的基本特性: 1. **无连接**:UDP不建立连接,而是直接发送数据包,无需确认对方是否已经接收。 2. **不可靠**:UDP不保证数据包的顺序...
在IT行业中,网络通信是至关重要的一个领域,而局域网(LAN)内的通信更是日常工作中常见的需求。...而提供的"用Delphi编写局域网中的UDP聊天程序.doc"文档可能包含了详细的步骤和代码示例,可作为实际开发的参考。
在基于UDP的聊天程序中,通常分为两个主要部分:消息发送方和消息接收方。首先,让我们详细了解这两个部分的工作原理。 **消息发送方** 1. **创建数据报的网络通道**:使用`DatagramSocket`类创建一个UDP套接字...