`
yangwei0915
  • 浏览: 465269 次
  • 性别: Icon_minigender_1
  • 来自: 西安
社区版块
存档分类
最新评论

使用Java的nio实现高效能的网络通信

 
阅读更多

在使用Java的API构建网络通信的应用时(尤其是基于移动的网络),有两种技术可供选择。第一种为直接使用Socket/Server Socket+输入输出流来构建,另外一种使用ServerSocketChannel+Selector。前者为阻塞式通信,需要服务端启动很多线程监听每一个客户端的Socket,通信效率低,且服务端容易受客户端的影响。比如当某个客户端当掉之后,服务的向这个客户端写数据的线程就会一直阻塞,直到写数据超时。所以我们在开发网络通信的应用时,首选后者。如下的代码实现了网络聊天/的功能,在加上语音或视频的录制功能又可实现网络的音频会议或视屏会议。网上和书上有很多类似的例子,但大多没有经过很好的测试,比如有一个客户端突然退出或者当掉,会影响服务端的运行。而我们的实际情况是要求服务端不应该受客户端的影响。

 

代码如下:

 

服务端:

package com.nio;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.channels.Channel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;

/**
 * 非阻塞通信的服务端
 * @author Alex
 *
 */
public class NioServer {
	private class NSServerThread extends Thread {
		private String ip;
		private int port;

		public NSServerThread(String ip, int port) {
			this.ip = ip;
			this.port = port;
		}

		public void run() {
			try {
				init(this.ip, this.port);
			} catch (UnknownHostException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			} catch (IOException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
	}

	public void init(String ip, int port) throws IOException { // 用于检测所有Channel状态的Selector
		System.out.println("打开NioPTTServer 端口为:" + port);
		Selector selector = null;
		// 定义实现编码、解码的字符集对象
		selector = Selector.open();
		// 通过open方法来打开一个未绑定的ServerSocketChannel实例
		ServerSocketChannel server = ServerSocketChannel.open();
		// InetSocketAddress isa = new InetSocketAddress(ip, port);
		InetSocketAddress isa = new InetSocketAddress(ip, port);
		// 将该ServerSocketChannel绑定到指定IP地址
		server.socket().bind(isa);
		// 设置ServerSocket以非阻塞方式工作
		server.configureBlocking(false);
		// 将server注册到指定Selector对象
		server.register(selector, SelectionKey.OP_ACCEPT);
		while (true) {
			int keys = selector.select();
			// System.out.println("keys:"+keys);
			if (keys > 0) {
				// 依次处理selector上的每个已选择的SelectionKey
				// SelectionKey removeReadSk=null;
				try {
					for (SelectionKey sk : selector.selectedKeys()) {
						// 从selector上的已选择Key集中删除正在处理的SelectionKey
						selector.selectedKeys().remove(sk);
						// 如果sk对应的通道包含客户端的连接请求
						if (sk.isAcceptable()) {
							// 调用accept方法接受连接,产生服务器端对应的SocketChannel
							SocketChannel sc = server.accept();
							// 设置采用非阻塞模式
							sc.configureBlocking(false);
							// 将该SocketChannel也注册到selector
							sc.register(selector, SelectionKey.OP_READ);
							// 将sk对应的Channel设置成准备接受其他请求
							sk.interestOps(SelectionKey.OP_ACCEPT);
						}
						// 如果sk对应的通道有数据需要读取
						if (sk.isReadable()) {
							// 获取该SelectionKey对应的Channel,该Channel中有可读的数据
							SocketChannel sc = (SocketChannel) sk.channel();
							// 定义准备执行读取数据的ByteBuffer
							ByteBuffer buff = ByteBuffer.allocate(1024);
							// 开始读取数据
							int len = 0;
							try {
								if ((len = sc.read(buff)) > 0) {
									buff.flip();// 缓存 2指针复位 准备下次读取数据
									System.out.println("读取数据:" + buff.array());
									sk.interestOps(SelectionKey.OP_READ);
								}else{
									System.out.println("没有数据读取,关闭通道");
									sk.cancel();
									if (sk.channel() != null) {
										sk.channel().close();
									}
								}
								// 将sk对应的Channel设置成准备下一次读取
								
							}
							// 如果捕捉到该sk对应的Channel出现了异常,即表明该Channel
							// 对应的Client出现了问题,所以从Selector中取消sk的注册
							catch (IOException ex) {
								// 从Selector中删除指定的SelectionKey
								ex.printStackTrace();
								sk.cancel();
								if (sk.channel() != null) {
									sk.channel().close();
								}
								System.out.println("关闭一个客户端");
							}
							// 如果content的长度大于0,即聊天信息不为空
							if (len > 0) {
								// 遍历该selector里注册的所有SelectKey
								SelectionKey writeKey = null;
								for (SelectionKey key : selector.keys()) {
									// 获取该key对应的Channel

									Channel targetChannel = key.channel();
									// 如果该channel是SocketChannel对象
									if (targetChannel instanceof SocketChannel) {
										// && key != sk) {
										// 将读到的内容写入该Channel中 ,返回到客户端
										if (targetChannel instanceof SocketChannel
												&& key != sk) {
											SocketChannel dest = null;
											try {
												// 将读到的内容写入该Channel中 ,返回到客户端
												dest = (SocketChannel) targetChannel;
												System.out.println("写数据:"
														+ buff.array());
												dest.write(buff);
											} catch (Exception e) {
												// 写异常,关闭通道
												e.printStackTrace();
												if (dest != null) {
													dest.close();
												}
												targetChannel.close();
												writeKey = key;
											}
										}
									}
								}
								if (writeKey != null) {
									writeKey.cancel();
									if (writeKey.channel() != null) {
										writeKey.channel().close();
									}
								}
							}
						}
					}
				} catch (Exception e) {
					e.printStackTrace();
					// selector.close();
					// server.close();
					// System.out.println("发生异常,重新打开端口");
					// init(ip,port);
					// throw new RuntimeException(e);
				}
			}
		}
	}

	public static void main(String[] args) throws IOException {
		// System.out.println(InetAddress.getLocalHost().getHostAddress());
		String host = InetAddress.getLocalHost().getHostAddress();
		int port = 30000;
		new NioServer().new NSServerThread(host, port).start();
		// new NServer().init(InetAddress.getLocalHost().getHostAddress(),
		// 30000);
		System.out.println("Nio服务端启动了,host:" + host);
	}
}

 

 

客户端代码:

package com.nio;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Scanner;

/**
 * 非阻塞通信的服务端
 * @author Alex
 *
 */
public class NioClient {

	public void init(String ip, int port) throws IOException {
		// 定义检测SocketChannel的Selector对象
		Selector selector = null;
		// 定义处理编码和解码的字符集
		//Charset charset = Charset.forName("UTF-8");
		// 客户端SocketChannel
		SocketChannel sc = null;
		selector = Selector.open();
		InetSocketAddress isa = new InetSocketAddress(ip, port);
		// 调用open静态方法创建连接到指定主机的SocketChannel
		sc = SocketChannel.open(isa);
		// 设置该sc以非阻塞方式工作
		sc.configureBlocking(false);
		// 将SocketChannel对象注册到指定Selector
		sc.register(selector, SelectionKey.OP_READ);
		// 启动读取服务器端数据的线程------------
		new ReadDataThread(selector).start();
		new SendDataThread(sc,selector).start();
	}

	private class SendDataThread extends Thread{
		SocketChannel sc;
		Selector selector;
		public SendDataThread(SocketChannel sc,Selector selector){
			this.sc=sc;
			this.selector=selector;
		}
		public void run(){
			// 创建键盘输入流-----------
			Scanner scan = new Scanner(System.in);
			ByteBuffer byteBuf=ByteBuffer.allocate(152);
			File file=new File("D:/未命名2.jpg");
			try{
				InputStream in=new FileInputStream(file);
				int len=0;
				while (scan.hasNextLine()) {
					// 读取键盘输入
					String line = scan.nextLine();
					
					// 将键盘输入的内容输出到SocketChannel中
					if("send".equals(line.trim())){
						byte[] buf=new byte[152];
						if((len=in.read(buf))>-1){
							byteBuf.put(buf);
							byteBuf.flip();
							sc.write(byteBuf);
							byteBuf.clear();
						}
						byteBuf.put(buf);
						byteBuf.flip();
						sc.write(byteBuf);
						byteBuf.clear();
					}else if("quit".equals(line.trim())){
						sc.close();
						selector.selectedKeys().clear();
						selector.close();
					}
					//sc.write(charset.encode(line));
				}
			}catch(Exception e){
				e.printStackTrace();
			}
			
		}
	}
	//读取服务器数据的----客户端监听线程
	private class ReadDataThread extends Thread {
		Selector selector;

		public ReadDataThread(Selector selector)
				throws IOException {
			this.selector = selector;
		}
		public void run() {
			try {
				ByteBuffer buff = ByteBuffer.allocate(152);
				while (selector.select() > 0) {
					// 遍历每个有可用IO操作Channel对应的SelectionKey
					for (SelectionKey sk : selector.selectedKeys()) {
						// 删除正在处理的SelectionKey
						selector.selectedKeys().remove(sk);
						// 如果该SelectionKey对应的Channel中有可读的数据
						if (sk.isReadable()) {
							// 使用NIO读取Channel中的数据
							SocketChannel sc = (SocketChannel) sk.channel();
							String content = "";
							if (sc.read(buff) > 0) {
								sc.read(buff);
								buff.flip();
								content = new String(buff.array());// charset.decode(buff);
								buff.clear();
							}
							// 打印输出读取的内容
							System.out.println("接收的数据:" + content);
							// 为下一次读取作准备
							sk.interestOps(SelectionKey.OP_READ);
						}
					}
				}
			} catch (Exception ex) {
				ex.printStackTrace();
			}
		}
	}

	public static void main(String[] args) throws IOException {
		
		new NioClient().init("192.168.42.128", 30000);
	}
}

 

分享到:
评论

相关推荐

    基于java的高性能web代理程序 hyk-proxy.zip

    Java是一种广泛使用的面向对象的编程语言,以其跨平台、高效能和丰富的类库而闻名。在“基于java的高性能web代理程序”中,Java的技术基础扮演了核心角色。Java的多线程特性使得它非常适合构建能够处理大量并发请求...

    基于java aio研发的网络编程框架,从收集到的案例来看,用t-io做物联网、IM、客服的比较多,堪称殿堂级网络开发框架

    Java AIO,全称为Asynchronous Input/Output,是Java NIO的一个扩展,它提供了一种非阻塞的I/O操作方式,使得开发者可以更高效地处理网络通信。T-io就是一个基于Java AIO实现的高性能、易用且稳定的网络编程框架。它...

    基于UDPjava聊天程序

    1. **UDP协议**:UDP是一种无连接的、不可靠的传输层协议,它不保证数据包的顺序、完整性和可靠性,但具有低延迟和高效能的特点,适合实时通信或对速度要求较高的场景。 2. **Java网络编程基础**:在Java中,进行...

    经典java程序

    Java是一种广泛使用的面向对象的编程语言,以其跨平台、高性能、丰富的类库和高效能而闻名。"经典Java程序"这个标题暗示了我们可能会探讨一些Java编程中的基础概念、核心特性以及一些常见的经典示例。 Java的核心...

    软件设计师专题12JAVA程序设计语言

    Java程序设计语言是软件设计师必备的核心技能之一,它以其跨平台、面向对象和高效能的特点在IT行业中占据了重要地位。本专题将深入探讨Java语言的关键概念、语法特性以及实际应用,帮助软件设计师提升编程能力。 一...

    Java多线程服务器原理.pdf

    在计算机科学中,特别是在网络编程领域,多线程技术对于实现高效能的服务器应用至关重要。Java语言提供了丰富的多线程支持,使得开发者能够轻松地创建和管理多个执行线程,从而实现服务器对多个客户端请求的并发处理...

    Java学习资料大总结

    Java编程语言是全球范围内广泛应用的面向对象的编程语言,它以其跨平台、高效能和强大的类库而闻名。"Java学习资料大总结"这个压缩包很可能是为了帮助初学者或有经验的开发者系统地掌握Java的各项核心概念和技术。在...

    JAVA性能调优分享.doc

    在网络通信方面,可以利用NIO(非阻塞I/O)实现数据的多元化传输,批处理和缓存策略减少网络传输次数,从而降低ACK校验的开销。 JDK提供了一系列内置的监控工具,如JConsole、VisualVM和JMX,可以帮助开发者了解JVM...

    一款Android平台UDP双向通信源码

    在Android平台上进行网络通信时,UDP(User Datagram Protocol)是一种常用的选择,因为它具有无连接、低延迟和高效能的特点。本源码示例是针对Android应用程序设计的,它实现了UDP的双向通信,允许数据在客户端和...

    dubbo-java-3.1.5

    1. 高性能:Dubbo基于NIO非阻塞I/O模型,实现了高并发处理能力,有效降低了系统资源的占用。 2. 服务治理:提供了服务注册、发现、调用、负载均衡、熔断、降级等全面的服务治理功能,确保了微服务的稳定性和可靠性。...

    httpcore-4.3.jar.zip

    它通过非阻塞I/O(NIO)实现这一功能,使得在高并发场景下,系统资源能得到更有效的利用。 3. **请求与响应处理**:HttpCore提供了一套灵活的API来构建和解析HTTP请求与响应。开发者可以通过自定义请求头、设置请求...

    高性能Server-Reactor模型.pdf

    高性能Server-Reactor模型是构建高效能网络服务的核心设计模式,尤其在Java领域中广泛应用。该模型主要用于解决C/S(客户端/服务器)架构中的高并发、低延迟问题,特别是在现代互联网应用如电子邮件、视频流媒体、...

    Netty 17道面试题和答案.docx

    Netty 是一个高性能、易用的网络通信框架,它基于 NIO(非阻塞I/O)设计,适用于构建高并发、低延迟的网络应用。Netty 提供了统一的 API,支持多种传输类型,如TCP、UDP,并且可以处理阻塞和非阻塞的连接。它内置了...

    Java性能优化技巧集锦.doc

    JDK 1.4引入的`java.nio`库提供了非阻塞I/O支持,这对于高并发的I/O操作非常有利,避免了大量线程创建的开销。在早期JDK版本中,可以寻找第三方库来支持非阻塞I/O。 3. **慎用异常** 异常处理应该仅用于错误情况...

    web server中的Tomcat架包

    在Web服务器领域,Tomcat以其轻量级、高效能和易于管理的特点,被广泛应用于各种规模的企业级应用开发和部署。 【描述】:在Web服务中,Tomcat扮演了核心角色,尤其是在Java开发中。作为Java标准版(Java SE)的一...

    Redis 客户端 JRedis.zip

    JRedis 内部使用非阻塞 I/O 模型,通常是 NIO (Java New I/O),以实现高效的网络通信。这意味着它能够在单个线程中处理多个连接,减少了线程切换的开销,提升了性能。同时,JRedis 还支持连接池,允许开发者管理多个...

    自定义rpc框架

    在自定义RPC框架中,可以使用Java NIO库进行优化,实现更高效的I/O操作。 SOA(Service-Oriented Architecture)解决方案是一种面向服务的架构,强调将业务功能封装为独立的服务,通过标准接口进行交互。在自定义...

    EU4Bootcamp:Cyber​​tek Java训练营

    学员将学习如何使用File类进行文件操作,使用Socket编程实现网络通信,以及NIO的非阻塞特性,这对于构建高性能的服务器端应用至关重要。 在面向对象编程方面,训练营会深入探讨设计模式,如单例模式、工厂模式、...

    java多线程tcpsocketserver源码-fcgi:fcgi

    FastCGI在Java中实现时,常常会与Socket服务端编程相结合,以创建高效能的Web服务。 在Java中,创建一个多线程TCP Socket服务器主要涉及以下知识点: 1. **Java套接字(Socket)**:Java提供Socket类来处理客户端...

Global site tag (gtag.js) - Google Analytics