`
Mojarra
  • 浏览: 130593 次
  • 性别: Icon_minigender_1
社区版块
存档分类
最新评论
阅读更多

在Reactor模式中,虽然可以采用non-blocking I/O模式,使用Selector注册感兴趣的I/O事件和读取感兴趣的I/O事件,I/O调用者向I/O系统请求一个I/O调用时,I/O立即返回给调用者一个反馈,这些反馈无外乎两大类型,请求已经被执行并且有结果返回,或者当前的通道缓存中无数据可用。第二种情况下,为保险起见,编写程序时需要写一个循环只到有数据被读取为止。在I/O系统处理一个I/O请求时,不可能同时处理第二个I/O操作。

 

异步I/O与同步I/O的最大不同是:当调用者向异步I/O系统请求一个I/O操作的时候,I/O系统自身会调用系统资源启动一个线程去处理该请求,当这个请求被处理完成后,通知调用者,并返回结果给调用者。在这个过程之中,调用者在自身的线程中可以去干一些别的工作。这种I/O处理方式提高了应用程序的可扩展性和性能。

 

JDK7中的NIO2版本引入了异步IO的功能。支持文件异步I/O操作和网络异步I/O操作。针对于网络部分的主要3个类和一个接口。

 

主要的类

1,  AsynchronousChannelGroup,这个类与一组AsynchronousSocketChannel相关联,并且与I/O系统交互,当有I/O请求被处理完成后,会把处理结果通知给一个实现了CompletionHandler接口的对象派发给相关的AsynchronousSocketChannel. 这个类在在使用时需要制定一个ExecutorService对象,ExecutorService对象中的线程池是用来应对与I/O系统交互及与通知调用者时需要的线程开销。

 

2, AsynchronousServerSocketChannel,异步网络ServerSocket,主要用来绑定端口,并接受客户端的连接。accept可以返回一个Future<AsynchronousSocketChannel>对象,调用Future对象的get方法得到一个传入的套接字 的通道对象,如

AsynchronousServerSocketChannel listener = .....
listener.bind(new InetSocketAddress(9001));
Future<AsynchronousSocketChannel> future = listener.accept();
AsynchronousSocketChannel channel = future.get();
..

在Future对象中拿到传入的套接字通道对象后,才可以再次调用accept方法拿到新的套接字通道,否则会抛出AcceptPendingException,虽然是异步的,不过这个方法实质上类似于同步的。

 

另外一种拿到套接字通道对象的方法是使用CompletionHandler的回调方式。这种方式也是Sun鼓励的方式。一旦套接字通道建立完成,I/O系统会调用CompletionHandler对象的completed方法,把已经建立连接的套接字通道传入到completed方法的第一参数当中去。 这个通道建立后,可以让服务器AsynchronousServerSocketChannel对象再次接受新的套接字通道。接着可以用这个刚刚建立的套接字通道来进行数据通讯的工作。

listener.accept(null, new CompletionHandler<AsynchronousSocketChannel,Void>() {
      public void completed(AsynchronousSocketChannel ch, Void att) {
          // accept the next connection
          listener.accept(null, this);

          // handle this connection
          ....
          .....
      }
      public void failed(Throwable exc, Void att) {
          ...
      }
  });
 

 

3, AsynchronousSocketChannel,真正的数据通讯类,使用write方法向channel的缓冲中写入数据,使用read方法读取channel缓冲中的数据,读和写的I/O操作完成后,I/O系统会发出一个通知,把读取、写入的字节数传入到CompletionHandler<Integer, A>对象中的completed方法的第一个参数。与AsynchronousServerSocketChannel的accept方法类型,如果一个I/O操作完成,而另外一个I/O操作又发起的话,会抛出PendingReadException或者PendingWriteException.因此在用AsynchronousSocketChannel处理I/O操作时,要非常小心,确保一个I/O操作发起前,要确保这个通道上的上一个I/O操作已经完成。

 

 

4, CompletionHandler接口,实现这个接口的对象在异步I/O操作中承担着通知的作用,当操作完成后,I/O系统会把通知的内容发送到completed方法中的第一个参数,并且I/O系统会调用completed方法。CompletionHandler按照使用场合可以分成3种:

AsynchronousServerSocket类中中用于接受传入连接套接字通道后的接入套接字通道完成通知,采用泛形声明的方式

CompletionHandler<AsynchronousSocketChannel, A>
 

AsynchronousSocketChannel写数据后的写入完成通知

CompletionHandler<Integer, A>
 

AsynchronousSocketChannel读取数据后的读取完成通知,如写入通知的声明相同。

 

异步读和写的问题

使用通道来读取数据时,首先是从通道的缓冲区读取一定量的数据到ByteBuffer中目标缓存中,如果目标缓存没有剩余的字节可供写入,直接返回零给读取通知的completed方法的第一个参数,如果这个参数的值为-1,表示缓冲区中没有可供读取的字节或者当前通道中的套接字输入流已经关闭。因为读写都是异步的,所以一端的通道进行I/O操作后如果立即关闭,另外一个端的通道很有可能仍然处于相对应的I/O操作过程之中,这样的情形下,会抛出一个远程通道被强制关闭的IOException。因此,在通道接收数据时,需要根据completed方法的第一个参数来判断是否需要关闭当前的通道,如果不能及时关闭通道,容易引起内存泄露的问题。

 

I/O系统何时把通道缓冲区中的字节传输到远程通道中的缓冲区里,这个是由I/O系统决定,而对于异步I/O的开发人员来说,是不可控制的,假如代码在客户端向通道里写入了3个字节序列,这3个序列在远程的接收端通道里可能会被接收成3个同样的字节序列,也有可能是2个序列,也有可能是1个序列,但是不伦是接受到几个序列,这些接受到的序列的内容和发送序列的内容是一样的。

 

无论读和写,都是不是线程安全的,必须等到I/O系统通知调用者后,才可以进行下一个操作,这个在多线程环境下,需要格外小心。

 

异步Echo Server

public class AsynEchoServer {

	private int port = 9999;

	private int backlog = 50;

	private int threadPoolSize = 20;

	private int initialSize = 5;

	public void start() throws IOException {
		ExecutorService executor = Executors.newFixedThreadPool(threadPoolSize);
		AsynchronousChannelGroup group = AsynchronousChannelGroup.withCachedThreadPool(executor, initialSize);
		AsynchronousServerSocketChannel listener = AsynchronousServerSocketChannel.open(group);
		listener.bind(new InetSocketAddress(port), backlog);
		listener.accept(listener, new CompletionHandler<AsynchronousSocketChannel, AsynchronousServerSocketChannel>() {

			@Override
			public void completed(AsynchronousSocketChannel channel, AsynchronousServerSocketChannel listener) {
				listener.accept(listener, this);
				ByteBuffer buffer = ByteBuffer.allocate(512);
				channel.read(buffer, buffer, new EchoHandler(channel, buffer));
			}

			@Override
			public void failed(Throwable exc, AsynchronousServerSocketChannel listener) {
				exc.printStackTrace();
				try {
					listener.close();
				} catch (IOException e) {
					e.printStackTrace();
				} finally {
					System.exit(-1);
				}
			}
		});
	}

	/**
	 * @param args
	 * @throws IOException
	 */
	public static void main(String[] args) throws IOException {
		AsynEchoServer server = new AsynEchoServer();
		server.start();
	}

	// getter & setters
}
 
class EchoHandler implements CompletionHandler<Integer, ByteBuffer> {
	private static Charset utf8 = Charset.forName("utf-8");
	AsynchronousSocketChannel channel;
	ByteBuffer buffer;

	public EchoHandler(AsynchronousSocketChannel channel, ByteBuffer buffer) {
		this.channel = channel;
		this.buffer = buffer;
	}

	@Override
	public void completed(Integer result, ByteBuffer buff) {
		if (result == -1) {
			try {
				channel.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		} else if (result > 0) {
			buffer.flip();
			String msg = utf8.decode(buffer).toString();
			System.out.println("echo: " + msg);
			Future<Integer> w = channel.write(utf8.encode(msg));
			try {
				w.get();
			} catch (InterruptedException e) {
				e.printStackTrace();
			} catch (ExecutionException e) {
				e.printStackTrace();
			}
			
			buffer.clear();
			channel.read(buff, buff, this);
		}
	}

	@Override
	public void failed(Throwable exc, ByteBuffer buff) {
		// TODO Auto-generated method stub
	}
}

 

EchoClient

class EchoClient {
	private static Charset utf8 = Charset.forName("utf-8");
	private int port = 9999;
	private String remoteHost;
	private String[] message;
	private AsynchronousSocketChannel channel;

	public static void main(String args[]) throws Exception {
		if (args.length >= 2) {
			String msgs[] = new String[args.length - 1];
			System.arraycopy(args, 1, msgs, 0, msgs.length);
			EchoClient client = new EchoClient(args[0], msgs);
			client.connect();
			client.sendAndReceive();
			Thread.sleep(3000);
			client.close();
			Thread.sleep(3000);
		} else {
			System.out.println("usage EchoClient [remotehost] [messages .... ]");
		}
	}

	public void sendAndReceive() throws InterruptedException, ExecutionException {
		ByteBuffer buffer = ByteBuffer.allocate(512);
		for (String msg : this.message) {
			Future<Integer> w = channel.write(utf8.encode(msg));
			w.get();
		}
		
		channel.read(buffer, buffer, new ReceiverHandler(channel, buffer));
	}

	public void close() throws IOException {
		channel.shutdownInput();
		channel.shutdownOutput();
	}

	public EchoClient(String remoteHost, String[] message) {
		super();
		this.remoteHost = remoteHost;
		this.message = message;
	}

	public void connect() throws IOException, InterruptedException, ExecutionException {
		channel = AsynchronousSocketChannel.open();
		Future<Void> r = channel.connect(new InetSocketAddress(this.remoteHost, this.port));
		r.get();
	}

	//  getter & setters
}

 

class ReceiverHandler implements CompletionHandler<Integer, ByteBuffer> {
	private static Charset utf8 = Charset.forName("utf-8");
	private AsynchronousSocketChannel channel;
	private ByteBuffer buffer;

	@Override
	public void completed(Integer result, ByteBuffer buff) {
		
		if (result > 0) {			
			buffer.flip();
			System.out.println(utf8.decode(buffer));
			buffer.clear();
			channel.read(buff, buff, this);
		}else if (result==-1){
			try {
				channel.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}

	@Override
	public void failed(Throwable exc, ByteBuffer buff) {
		exc.printStackTrace();
	}

	public ReceiverHandler(AsynchronousSocketChannel channel, ByteBuffer buffer) {
		super();
		this.channel = channel;
		this.buffer = buffer;
	}
}
 

 

参考资料

http://www.artima.com/lejava/articles/more_new_io.html

http://www.kegel.com/c10k.html

 

 

2
1
分享到:
评论

相关推荐

    JDK7源码(JDK7u7_src.zip)

    4. `java`:这是JDK中最核心的包,包含了所有基本数据类型、集合框架、I/O流、网络编程、多线程等关键类。例如,`java.lang`包下的`Object`类是所有Java类的基类,`String`类是处理字符串的常用工具,而`Thread`类则...

    JDK7新特性(完整篇)

    8. **JDK7新特性&lt;八&gt; 异步io/AIO** 异步I/O(Asynchronous Input/Output,AIO)提供了非阻塞的读写操作,使得应用程序在等待I/O完成时可以执行其他任务,从而提高了I/O密集型应用的性能。 综上所述,JDK7的新特性...

    jdk 7 32位免安装版

    10. **NIO.2(New IO 2)**:增加了对文件系统和文件操作的增强,如文件属性、异步I/O和文件渠道的改进。 在使用JDK 7 32位免安装版时,用户需要确保自己的操作系统是32位的,并且按照以下步骤进行配置: 1. 解压缩...

    jdk7 jdk1.7

    3. **更好的文件I/O API**:NIO.2(New IO 2.0)是Java 7中的一个重要改进,提供了异步文件操作和文件路径API,使得处理文件系统操作更加简单和高效。 4. **钻石操作符**:在创建泛型实例时,可以省略尖括号中的...

    JDK1.7(JavaSE1.7/JDK7)

    在文件系统API上,Java7引入了NIO.2(New IO 2.0),提供了更为强大和灵活的文件操作接口,如Path、Files和Paths类,支持异步I/O操作,使得文件操作更为高效。 在并发编程领域,JDK7引入了Fork/Join框架,这是一种...

    JDK7 API 中文 文档.CHM

    3. **NIO.2:**Java 7引入了**New IO (NIO.2)**,也被称为**Java SE 7 File API**,提供了更强大的文件操作能力,包括异步I/O、文件属性操作、文件系统链接以及路径操作等。 4. **文件系统API:**通过`java.nio....

    jdk1.7 java官网正式版Windows 64位-jdk-7u80-windows-x64.exe

    在文件系统操作方面,JDK 1.7引入了NIO.2(New IO 2.0),提供了更好的异步I/O支持和更高级的文件操作,如文件通道、路径API和文件属性查询等。这些新特性极大地提高了开发者处理文件系统的灵活性和效率。 在安全性...

    Linux JDK7 64位(tar.gz)

    4. **NIO.2**:Java 7增强了非阻塞I/O(New IO)框架,引入了文件系统路径API、异步文件通道以及文件属性查看等功能,使得I/O操作更加灵活高效。 5. **类型推断增强**:Java 7的编译器可以更好地推断局部变量的类型...

    官网jdk7的api文档(html版)

    Java Development Kit (JDK) 7 是 Java 编程语言的一个重要版本,它包含了开发和运行Java应用程序所需的所有工具和库。"API (Application Programming Interface) 文档"是程序员使用特定编程语言或库进行开发时的...

    jdk 7u6 api 文档(HTML)

    JDK 7u6 API文档中包含的类库广泛,涵盖了诸如集合框架、IO流、网络编程、多线程、反射、枚举、注解等多个方面。开发者可以通过以下方式利用这些文档: 1. **学习新类和方法**:API文档详尽列出了每个类的构造函数...

    Java-JDK-7.rar_jdk7

    Java JDK 7,全称Java Development Kit 7,是Oracle公司发布的用于开发Java应用程序的重要工具集,包含了Java编译器、Java虚拟机(JVM)以及各种开发工具,如Java文档生成工具、性能分析工具等。这个“Java-JDK-7....

    jdk-7u67-windows-i586-32位

    此外,它还包含了一套丰富的类库,如Java基础类库、IO流、网络编程、多线程、集合框架等。 2. **JDK 7的特性**: - **Try-with-resources语句**:自动关闭资源,提高了代码的可读性和安全性。 - **多行字符串字面...

    jdk-7windows-x64安装包

    **Java Development Kit (JDK) 7 for Windows x64** JDK 7是Java编程语言的一个重要版本,专为Windows x64操作系统设计。这个安装包包含Java Development Kit和Java Runtime Environment (JRE),两者都是Java应用...

    64位正版jdk-7u80-windows-x64(安装版)

    7. **NIO.2(New IO 2)**:提供了对文件系统操作的增强,如异步I/O,文件路径API,文件属性查询等。 8. **元空间(Metaspace)**:取代了之前版本的永久代,用于存储类元数据,减少了Full GC的发生。 **64位版本*...

    jdk 7 34位

    5. **文件系统API增强**:NIO.2(New IO 2.0)引入了更强大的文件系统操作,包括文件路径、文件属性和异步I/O等。 6. **改进的异常处理**:可以捕获多个异常类型,使得异常处理更加灵活。 7. **堆外内存分配**:Java...

    jdk1.7-7u79-windows32-i586

    在文件I/O方面,NIO.2(New IO 2.0)提供了更高级别的文件系统操作,如路径操作、文件属性查询、异步I/O等,为开发者提供了更多便利。 此外,类型推断的增强使得局部变量类型声明更简洁,如`var list = new ...

    JDK 7 API english

    JDK 7 API的核心类库包括了基础的`java.lang`、`java.util`和`java.io`包。`java.lang`包包含了所有Java程序的基本类,如`Object`、`String`和`System`。`java.util`提供了数据结构(如`ArrayList`和`HashMap`)、...

    JDK7 Reference Card 参考指南

    JDK 7 引入了真正的异步 I/O,大大提高了 I/O 操作的效率,包括: - **`AsynchronousFileChannel`**:用于异步读写文件。 - **`AsynchronousSocketChannel`**:用于异步连接到远程主机的套接字。 - **`...

    jdk-7u271-linux-x64

    "jdk-7u271-linux-x64" 是Oracle公司发布的JDK 7的第271个更新版本,针对64位Linux操作系统。这个版本的发布主要是为了修复已知的安全漏洞、性能优化和增强功能,以确保用户能在一个更稳定和安全的环境中开发和运行...

    jdk1.8API各种版本.7z

    这个压缩包"jdk1.8API各种版本.7z"显然包含了不同翻译版本的JDK1.8 API文档,这为中文开发者提供了极大的便利,因为API文档通常以英文为主,可能会对非英语母语的开发者造成理解障碍。 1. **有道翻译版本**: 有道是...

Global site tag (gtag.js) - Google Analytics