J2SE1.4以上版本中发布了全新的I/O类库。本文将通过一些实例来简单介绍NIO库提供的一些新特性:非阻塞I/O,字符转换,缓冲以及通道。
NIO包(java.nio.*)引入了四个关键的抽象数据类型,它们共同解决传统的I/O类中的一些问题。
1. Buffer:它是包含数据且用于读写的线形表结构。其中还提供了一个特殊类用于内存映射文件的I/O操作。
2. Charset:它提供Unicode字符串影射到字节序列以及逆影射的操作。
3. Channels:包含socket,file和pipe三种管道,它实际上是双向交流的通道。
4. Selector:它将多元异步I/O操作集中到一个或多个线程中(它可以被看成是Unix中select()函数或Win32中WaitForSingleEvent()函数的面向对象版本)。
二. 回顾传统
在介绍NIO之前,有必要了解传统的I/O操作的方式。以网络应用为例,传统方式需要监听一个ServerSocket,接受请求的连接为其提供服务(服务通常包括了处理请求并发送响应)图一是服务器的生命周期图,其中标有粗黑线条的部分表明会发生I/O阻塞。
图一
可以分析创建服务器的每个具体步骤。首先创建ServerSocket
ServerSocket server=new ServerSocket(10000);
然后接受新的连接请求
Socket newConnection=server.accept();
对于accept方法的调用将造成阻塞,直到ServerSocket接受到一个连接请求为止。一旦连接请求被接受,服务器可以读客户socket中的请求。
InputStream in = newConnection.getInputStream();
InputStreamReader reader = new InputStreamReader(in);
BufferedReader buffer = new BufferedReader(reader);
Request request = new Request();
while(!request.isComplete()) {
String line = buffer.readLine();
request.addLine(line);
}
这样的操作有两个问题,首先BufferedReader类的readLine()方法在其缓冲区未满时会造成线程阻塞,只有一定数据填满了缓冲区或者客户关闭了套接字,方法才会返回。其次,它回产生大量的垃圾,BufferedReader创建了缓冲区来从客户套接字读入数据,但是同样创建了一些字符串存储这些数据。虽然BufferedReader内部提供了StringBuffer处理这一问题,但是所有的String很快变成了垃圾需要回收。
同样的问题在发送响应代码中也存在
Response response = request.generateResponse();
OutputStream out = newConnection.getOutputStream();
InputStream in = response.getInputStream();
int ch;
while(-1 != (ch = in.read())) {
out.write(ch);
}
newConnection.close();
类似的,读写操作被阻塞而且向流中一次写入一个字符会造成效率低下,所以应该使用缓冲区,但是一旦使用缓冲,流又会产生更多的垃圾。
传统的解决方法
通常在Java中处理阻塞I/O要用到线程(大量的线程)。一般是实现一个线程池用来处理请求,如图二
图二
线程使得服务器可以处理多个连接,但是它们也同样引发了许多问题。每个线程拥有自己的栈空间并且占用一些CPU时间,耗费很大,而且很多时间是浪费在阻塞的I/O操作上,没有有效的利用CPU。
三. 新I/O
1. Buffer
传统的I/O不断的浪费对象资源(通常是String)。新I/O通过使用Buffer读写数据避免了资源浪费。Buffer对象是线性的,有序的数据集合,它根据其类别只包含唯一的数据类型。
java.nio.Buffer 类描述
java.nio.ByteBuffer 包含字节类型。 可以从ReadableByteChannel中读在 WritableByteChannel中写
java.nio.MappedByteBuffer 包含字节类型,直接在内存某一区域映射
java.nio.CharBuffer 包含字符类型,不能写入通道
java.nio.DoubleBuffer 包含double类型,不能写入通道
java.nio.FloatBuffer 包含float类型
java.nio.IntBuffer 包含int类型
java.nio.LongBuffer 包含long类型
java.nio.ShortBuffer 包含short类型
可以通过调用allocate(int capacity)方法或者allocateDirect(int capacity)方法分配一个Buffer。特别的,你可以创建MappedBytesBuffer通过调用FileChannel.map(int mode,long position,int size)。直接(direct)buffer在内存中分配一段连续的块并使用本地访问方法读写数据。非直接(nondirect)buffer通过使用Java中的数组访问代码读写数据。有时候必须使用非直接缓冲例如使用任何的wrap方法(如ByteBuffer.wrap(byte[]))在Java数组基础上创建buffer。
2. 字符编码
向ByteBuffer中存放数据涉及到两个问题:字节的顺序和字符转换。ByteBuffer内部通过ByteOrder类处理了字节顺序问题,但是并没有处理字符转换。事实上,ByteBuffer没有提供方法读写String。
Java.nio.charset.Charset处理了字符转换问题。它通过构造CharsetEncoder和CharsetDecoder将字符序列转换成字节和逆转换。
3. 通道(Channel)
你可能注意到现有的java.io类中没有一个能够读写Buffer类型,所以NIO中提供了Channel类来读写Buffer。通道可以认为是一种连接,可以是到特定设备,程序或者是网络的连接。通道的类等级结构图如下
图三
图中ReadableByteChannel和WritableByteChannel分别用于读写。
GatheringByteChannel可以从使用一次将多个Buffer中的数据写入通道,相反的,ScatteringByteChannel则可以一次将数据从通道读入多个Buffer中。你还可以设置通道使其为阻塞或非阻塞I/O操作服务。
为了使通道能够同传统I/O类相容,Channel类提供了静态方法创建Stream或Reader
4. Selector
在过去的阻塞I/O中,我们一般知道什么时候可以向stream中读或写,因为方法调用直到stream准备好时返回。但是使用非阻塞通道,我们需要一些方法来知道什么时候通道准备好了。在NIO包中,设计Selector就是为了这个目的。SelectableChannel可以注册特定的事件,而不是在事件发生时通知应用,通道跟踪事件。然后,当应用调用Selector上的任意一个selection方法时,它查看注册了的通道看是否有任何感兴趣的事件发生。图四是selector和两个已注册的通道的例子
图四
并不是所有的通道都支持所有的操作。SelectionKey类定义了所有可能的操作位,将要用两次。首先,当应用调用SelectableChannel.register(Selector sel,int op)方法注册通道时,它将所需操作作为第二个参数传递到方法中。然后,一旦SelectionKey被选中了,SelectionKey的readyOps()方法返回所有通道支持操作的数位的和。SelectableChannel的validOps方法返回每个通道允许的操作。注册通道不支持的操作将引发IllegalArgumentException异常。下表列出了SelectableChannel子类所支持的操作。
ServerSocketChannel OP_ACCEPT
SocketChannel OP_CONNECT, OP_READ, OP_WRITE
DatagramChannel OP_READ, OP_WRITE
Pipe.SourceChannel OP_READ
Pipe.SinkChannel OP_WRITE
1. 简单网页内容下载
这个例子非常简单,类SocketChannelReader使用SocketChannel来下载特定网页的HTML内容。
package examples.nio;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.net.InetSocketAddress;
import java.io.IOException;
private Charset charset=Charset.forName("UTF-8");//创建UTF-8字符集
private SocketChannel channel;
try{
connect();
sendRequest();
readResponse();
}catch(IOException e){
System.err.println(e.toString());
}finally{
if(channel!=null){
try{
channel.close();
}catch(IOException e){}
}
}
}
private void connect()throws IOException{//连接到CSDN
InetSocketAddress socketAddress=
new InetSocketAddress("http://www.csdn.net",80/);
channel=SocketChannel.open(socketAddress);
//使用工厂方法open创建一个channel并将它连接到指定地址上
//相当与SocketChannel.open().connect(socketAddress);调用
}
channel.write(charset.encode("GET "
+"/document"
+"\r\n\r\n"));//发送GET请求到CSDN的文档中心
//使用channel.write方法,它需要CharByte类型的参数,使用
//Charset.encode(String)方法转换字符串。
}
ByteBuffer buffer=ByteBuffer.allocate(1024);//创建1024字节的缓冲
while(channel.read(buffer)!=-1){
buffer.flip();//flip方法在读缓冲区字节操作之前调用。
System.out.println(charset.decode(buffer));
//使用Charset.decode方法将字节转换为字符串
buffer.clear();//清空缓冲
}
}
new SocketChannelReader().getHTMLContent();
}
2. 简单的加法服务器和客户机
服务器代码
package examples.nio;
import java.nio.IntBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.net.InetSocketAddress;
import java.io.IOException;
* SumServer.java
*
*
* Created: Thu Nov 06 11:41:52 2003
*
* @author starchu1981
* @version 1.0
*/
public class SumServer {
private IntBuffer _intBuffer=_buffer.asIntBuffer();
private SocketChannel _clientChannel=null;
private ServerSocketChannel _serverChannel=null;
try{
openChannel();
waitForConnection();
}catch(IOException e){
System.err.println(e.toString());
}
}
_serverChannel=ServerSocketChannel.open();
_serverChannel.socket().bind(new InetSocketAddress(10000));
System.out.println("服务器通道已经打开");
}
while(true){
_clientChannel=_serverChannel.accept();
if(_clientChannel!=null){
System.out.println("新的连接加入");
processRequest();
_clientChannel.close();
}
}
}
_buffer.clear();
_clientChannel.read(_buffer);
int result=_intBuffer.get(0)+_intBuffer.get(1);
_buffer.flip();
_buffer.clear();
_intBuffer.put(0,result);
_clientChannel.write(_buffer);
}
new SumServer().start();
}
} // SumServer
客户代码
package examples.nio;
import java.nio.IntBuffer;
import java.nio.channels.SocketChannel;
import java.net.InetSocketAddress;
import java.io.IOException;
* SumClient.java
*
*
* Created: Thu Nov 06 11:26:06 2003
*
* @author starchu1981
* @version 1.0
*/
public class SumClient {
private IntBuffer _intBuffer;
private SocketChannel _channel;
_intBuffer=_buffer.asIntBuffer();
} // SumClient constructor
public int getSum(int first,int second){
int result=0;
try{
_channel=connect();
sendSumRequest(first,second);
result=receiveResponse();
}catch(IOException e){System.err.println(e.toString());
}finally{
if(_channel!=null){
try{
_channel.close();
}catch(IOException e){}
}
}
return result;
}
InetSocketAddress socketAddress=
new InetSocketAddress("localhost",10000);
return SocketChannel.open(socketAddress);
}
private void sendSumRequest(int first,int second)throws IOException{
_buffer.clear();
_intBuffer.put(0,first);
_intBuffer.put(1,second);
_channel.write(_buffer);
System.out.println("发送加法请求 "+first+"+"+second);
}
private int receiveResponse()throws IOException{
_buffer.clear();
_channel.read(_buffer);
return _intBuffer.get(0);
}
SumClient sumClient=new SumClient();
System.out.println("加法结果为 :"+sumClient.getSum(100,324));
}
} // SumClient
3. 非阻塞的加法服务器
首先在openChannel方法中加入语句
_serverChannel.configureBlocking(false);//设置成为非阻塞模式
private void waitForConnection()throws IOException{
Selector acceptSelector = SelectorProvider.provider().openSelector();
这就告诉Selector,套接字想要在accept操作发生时被放在ready表
上,因此,允许多元非阻塞I/O发生。*/
SelectionKey acceptKey = ssc.register(acceptSelector,
SelectionKey.OP_ACCEPT);
int keysAdded = 0;
/*select方法在任何上面注册了的操作发生时返回*/
while ((keysAdded = acceptSelector.select()) > 0) {
// 某客户已经准备好可以进行I/O操作了,获取其ready键集合
Set readyKeys = acceptSelector.selectedKeys();
Iterator i = readyKeys.iterator();
while (i.hasNext()) {
SelectionKey sk = (SelectionKey)i.next();
i.remove();
ServerSocketChannel nextReady =
(ServerSocketChannel)sk.channel();
// 接受加法请求并处理它
_clientSocket = nextReady.accept().socket();
processRequest();
_clientSocket.close();
}
}
}
1. <Master Merlin's new I/O classes> From <http://www.javawordl.com/>
2. J2SE1.4.2 API Specification From <http://java.sun.com/>
3. <Working with SocketChannels> From <http://developer.java.sun.com/developer>
4. NIO Examples From <http://java.sun.com/>
nio是new io的简称,从jdk1.4就被引入了。现在的jdk已经到了1.6了,可以说不是什么新东西了。但其中的一些思想值得我来研究。这两天,我研究了下其中的套接字部分,有一些心得,在此分享。 首先先分析下:为什么要nio套接字? nio的主要作用就是用来解决速度差异的。举个例子:计算机处理的速度,和用户按键盘的速度。这两者的速度相差悬殊。如果按照经典的方法:一个用户设定一个线程,专门等待用户的输入,无形中就造成了严重的资源浪费:每一个线程都需要珍贵的cpu时间片,由于速度差异造成了在这个交互线程中的cpu都用来等待。 nio套接字是怎么做到的? 其实,其中的思想很简单:轮询。一个线程轮询多个input;传统的方式是:有n个客户端就要有n个服务线程+一个监听线程,现在采取这种凡是,可以仅仅使用1个线程来代替n个服务线程以此来解决。 具体应用例子: 在ftp的控制连接中,因为只有少量的字符命令进行传输,所以可以考虑利用这种轮询的方式实现,以节省资源。 具体见例子。 Java代码
package com.cxz.io;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.HashMap;
import java.util.Map;
import java.util.Collections;
public class IoEchoServer implements Runnable {
// ThreadLocal<Socket> localSocket = new ThreadLocal<Socket>();
Map<String, Socket> socketMap = Collections
.synchronizedMap(new HashMap<String, Socket>());
int threadCounter = 0;
synchronized private int getCounter() {
return threadCounter++;
}
public IoEchoServer() throws IOException {
ServerSocket server = new ServerSocket(1984);
while (true) {
Socket socket = server.accept();
// happened in the main thread.
// localSocket.set(socket);
String threadName = "---Thread" + getCounter() + "---";
socketMap.put(threadName, socket);
this.start(threadName);
}
}
/**
* @param args
* @throws IOException
*/
public static void main(String[] args) throws IOException {
new IoEchoServer();
}
public void run() {
try {
Socket socket = socketMap.get(Thread.currentThread().getName());
BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
//PrintWriter out = new PrintWriter(socket.getOutputStream());
String buffer = null;
while(!"END".equals(buffer)){
buffer = in.readLine();
System.out.println(buffer);
}
in.close();
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
public void start(String threadName) {
new Thread(this, threadName).start();
}
}
下面这个例子采取了nio方式实现,虽然还是有阻塞部分,但是与上一个相比,效率已经大幅提高。仅仅阻塞到一个监听线程中。 Java代码
package com.cxz.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.util.Iterator;
import java.util.Set;
public class NioEchoServer {
private static Selector roller = null;
private static final int port = 8080;
private static NioEchoServer instance = null;
private ThreadLocal<StringBuffer> stringLocal = new ThreadLocal<StringBuffer>();
private NioEchoServer() throws IOException {
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.socket().bind(new InetSocketAddress(port));
serverChannel.configureBlocking(false);
serverChannel.register(roller, SelectionKey.OP_ACCEPT);
}
public synchronized static NioEchoServer getInstance() throws IOException {
if (instance == null) {
roller = Selector.open();
instance = new NioEchoServer();
}
return instance;
}
public void start() throws IOException {
int keyAdded = 0;
while ((keyAdded = roller.select()) > 0) {
Set<SelectionKey> keySets = roller.selectedKeys();
Iterator iter = keySets.iterator();
while (iter.hasNext()) {
SelectionKey key = (SelectionKey) iter.next();
iter.remove();
actionHandler(key);
}
}
}
public void actionHandler(SelectionKey key) throws IOException {
if (key.isAcceptable()) {
ServerSocketChannel serverChannel = (ServerSocketChannel) key
.channel();
SocketChannel socketChannel = serverChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.register(roller, SelectionKey.OP_READ);
} else if (key.isReadable()) {
ByteBuffer buffer = ByteBuffer.allocate(16);
SocketChannel socketChannel = (SocketChannel) key.channel();
socketChannel.read(buffer);
buffer.flip();
String temp = decode(buffer);
StringBuffer strBuffer = stringLocal.get();
if (strBuffer == null) {
strBuffer = new StringBuffer();
}
strBuffer.append(temp);
if (temp.equals("\r\n")) {
System.out.println(strBuffer.toString());
strBuffer = null;
}
stringLocal.set(strBuffer);
}
}
public String decode(ByteBuffer buffer) {
Charset charset = null;
CharsetDecoder decoder = null;
CharBuffer charBuffer = null;
try {
charset = Charset.forName("UTF-8");
decoder = charset.newDecoder();
charBuffer = decoder.decode(buffer);
return charBuffer.toString();
} catch (Exception ex) {
ex.printStackTrace();
return "";
}
}
public static void main(String[] args) {
try {
NioEchoServer.getInstance().start();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
当学习了Java NIO和IO的API后,一个问题很快进入心中:
我应该在何时使用IO,何时使用NIO呢?
在本文中,我会尽量清晰地阐明Java NIO和IO的差异、它们的用例,以及它们如何影响您的代码设计。
Java NIO和IO的主要差异
下表总结了Java NIO和IO之间的主要差别,我会更详细地描述表中每部分的差异。
IO NIO
面向流 面向缓冲
阻塞的IO 非阻塞IO
选择器
面向流与面向缓冲
Java NIO和IO之间第一个最大的区别是,IO是面向流的,其中NIO是面向缓冲区的。那么,者是什么意思?
Java IO面向流意味着每次从流中读一个或多个字节,直至读取所有字节,它们没有被缓存在任何地方。此外,不能前后移动流中的数据。如果需要前后移动从流中读取的数据,需要先将它缓存到一个缓冲区。
Java NIO的缓冲导向方法略有不同。数据读取到一个它以后处理的缓冲区,需要时,可在缓冲区中前后移动。这就增加了处理过程中的灵活性。但是,还需要检查,是否该缓冲区包含所有您需要充分处理的数据。而且,需确保当更多的数据读入缓冲区时,不要覆盖尚未处理的缓冲区数据。
阻塞与非阻塞IO
Java IO的各种流是阻塞的。这意味着,当一线程调用读read() 或 write()时,该线程被阻塞,直到有一些数据被读取,或数据完全写入。该线程在此期间不能再干任何事情。
Java NIO的非阻塞模式,使一个线程从某通道请求读取数据,仅得目前可用的,或若目前没有数据可用时,什么都没有。而不是保持线程阻塞,直至数据变得可供读取,该线程可以继续其他的事情。
非阻塞写也是如此。一线程请求写入一些数据到某通道,但不等待它完全写入,然后该线程同时可以去做别的事情。
线程将不是阻塞的IO空闲时间调用通常花费在其它通道在此期间的IO执行上,亦即,一个单独的线程现在可以管理多个输入和输出通道。
选择器
Java NIO的选择器允许一个单独的线程来监视多个输入通道,可以注册一个选择器的多个通道,然后使用一个单独的线程“选择”通道:已有输入可用于处理,或某通道已准备写入。这种选择机制,使得一个单独的线程很容易来管理多个通道。
NIO和IO如何影响应用程序的设计
无论您选择IO或NIO工具箱,可能会影响您应用程序设计的以下几个方面:
1. 对NIO或IO类的API调用。
2. 数据处理。
3. 用来处理数据的线程数。
API调用
当然,使用NIO的API调用时看起来与使用IO时有所不同,这也难怪,不是仅仅从一个InputStream逐字节读取,数据必须先读入缓冲区,然后从那里处理。
数据处理
使用纯粹的NIO设计相较IO设计,数据处理也受到影响。
在一IO设计,从一InputStream或 Reader逐字节读取。想象一下,正在处理一基于文本数据的行流,例如:
Name: Anna
Age: 25
Email: anna@mailserver.com
Phone: 1234567890
该文本行的流可以这样处理:
请注意处理状态由程序执行多久决定确定。换句话说,一旦reader.readLine()方法返回,你就知道肯定文本行就已读完, readline()阻塞直到整行读完,此即原因。你也知道此行包含名称;同样,第二个readline()调用返回的时候,你知道这行包含年龄等。
正如你可以看到,该处理程序仅在有新数据读入时运行,并知道每步的数据是什么。一旦正在运行的县城已处理过读入的某些数据,该线程不会再回退数据(大多如此)。下图也说明了这条原则:
Java IO: 从阻塞流中读取数据
一个NIO实现会有所不同。下面是一个简单的例子:
注意第二行,从通道读取字节到ByteBuffer。当这个方法调用返回时,你不知道你所需的所有数据是否在缓冲区内。你所知道的是,该缓冲区包含一些字节,这使得处理有点困难。
想象一下,如果第一次 read(buffer)调用后,所有读入缓冲区为半行,例如,“Name:An”,你能处理数据吗?显然不能,需要等待,直到整行数据读入缓存,在此之前,对数据的任何处理毫无意义。
所以,你怎么知道是否该缓冲区包含足够的数据可以处理呢?好了,你不知道。发现的方法只能查看缓冲区中的数据。其结果是,在知道数据是否在其中前,需要检查缓冲区几次。这不仅效率低下,而且可以使方案设计杂乱不堪。例如:
bufferFull()方法必须跟踪有多少数据读入缓冲区,并返回真或假,取决于缓冲区是否已满。换句话说,如果缓冲区准备好被处理,它被认为是满了。
bufferFull()方法扫描缓冲区,但必须保持在bufferFull()方法被调用之前状态相同。如果没有,下一个读入缓冲区的数据可能无法读到正确的位置。这是不可能的,但却是需要注意的又一问题。
如果缓冲区已满,它可以被处理。如果它不满,在特定的情况下,也许可以部分过处理其中存在的任何数据。许多情况下并非如此。
下图展示了“缓冲区数据循环就绪”:
Java NIO: 从通道读取数据直至所需数据在缓存区中
摘要
NIO可让您管理只使用一个(或几个)单线程管理多个通道(网络连接或文件),但付出的代价是,解析数据可能会比从一个阻塞流中读取数据时较为复杂。
如果需要管理成千上万打开的连接共存,每次只能发送小部分数据,例如聊天服务器,实现NIO的服务器可能是一个优势。同样,如果你需要保持许多打开的连接到其他计算机上,如P2P网络中,使用一个单独的线程来管理你所有出站连接,可能是一个优势。一个线程多个连接的设计方案如下图所示:
Java NIO: 单线程管理多连接
如果你有非常高的带宽更少的连接,一次发送大量的数据,也许典型的IO服务器实现可能的最佳契合。下图说明了一个典型的IO服务器设计:
Java IO: 典型IO 设计 –每线程处理一个连接
Java NIO原理图文分析及代码实现
前言:
最近在分析hadoop的RPC(Remote Procedure Call Protocol ,远程过程调用协议,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。可以参考:http://baike.baidu.com/view/32726.htm )机制时,发现hadoop的RPC机制的实现主要用到了两个技术:动态代理(动态代理可以参考博客:http://weixiaolu.iteye.com/blog/1477774 )和java NIO。为了能够正确地分析hadoop的RPC源码,我觉得很有必要先研究一下java NIO的原理和具体实现。
这篇博客我主要从两个方向来分析java NIO
目录:
一.java NIO 和阻塞I/O的区别
1. 阻塞I/O通信模型
2. java NIO原理及通信模型
二.java NIO服务端和客户端代码实现
具体分析:
一.java NIO 和阻塞I/O的区别
1. 阻塞I/O通信模型
假如现在你对阻塞I/O已有了一定了解,我们知道阻塞I/O在调用InputStream.read()方法时是阻塞的,它会一直等到数据到来时(或超时)才会返回;同样,在调用ServerSocket.accept()方法时,也会一直阻塞到有客户端连接才会返回,每个客户端连接过来后,服务端都会启动一个线程去处理该客户端的请求。阻塞I/O的通信模型示意图如下:
如果你细细分析,一定会发现阻塞I/O存在一些缺点。根据阻塞I/O通信模型,我总结了它的两点缺点:
1. 当客户端多时,会创建大量的处理线程。且每个线程都要占用栈空间和一些CPU时间
2. 阻塞可能带来频繁的上下文切换,且大部分上下文切换可能是无意义的。
在这种情况下非阻塞式I/O就有了它的应用前景。
2. java NIO原理及通信模型
Java NIO是在jdk1.4开始使用的,它既可以说成“新I/O”,也可以说成非阻塞式I/O。下面是java NIO的工作原理:
1. 由一个专门的线程来处理所有的 IO 事件,并负责分发。
2. 事件驱动机制:事件到的时候触发,而不是同步的去监视事件。
3. 线程通讯:线程之间通过 wait,notify 等方式通讯。保证每次上下文切换都是有意义的。减少无谓的线程切换。
阅读过一些资料之后,下面贴出我理解的java NIO的工作原理图:
(注:每个线程的处理流程大概都是读取数据、解码、计算处理、编码、发送响应。)
Java NIO的服务端只需启动一个专门的线程来处理所有的 IO 事件,这种通信模型是怎么实现的呢?呵呵,我们一起来探究它的奥秘吧。java NIO采用了双向通道(channel)进行数据传输,而不是单向的流(stream),在通道上可以注册我们感兴趣的事件。一共有以下四种事件:
事件名 | 对应值 |
服务端接收客户端连接事件 | SelectionKey.OP_ACCEPT(16) |
客户端连接服务端事件 | SelectionKey.OP_CONNECT(8) |
读事件 | SelectionKey.OP_READ(1) |
写事件 | SelectionKey.OP_WRITE(4) |
服务端和客户端各自维护一个管理通道的对象,我们称之为selector,该对象能检测一个或多个通道 (channel) 上的事件。我们以服务端为例,如果服务端的selector上注册了读事件,某时刻客户端给服务端发送了一些数据,阻塞I/O这时会调用read()方法阻塞地读取数据,而NIO的服务端会在selector中添加一个读事件。服务端的处理线程会轮询地访问selector,如果访问selector时发现有感兴趣的事件到达,则处理这些事件,如果没有感兴趣的事件到达,则处理线程会一直阻塞直到感兴趣的事件到达为止。下面是我理解的java NIO的通信模型示意图:
二.java NIO服务端和客户端代码实现
为了更好地理解java NIO,下面贴出服务端和客户端的简单代码实现。
服务端:
- package cn.nio;
- import java.io.IOException;
- import java.net.InetSocketAddress;
- import java.nio.ByteBuffer;
- import java.nio.channels.SelectionKey;
- import java.nio.channels.Selector;
- import java.nio.channels.ServerSocketChannel;
- import java.nio.channels.SocketChannel;
- import java.util.Iterator;
- /**
- * NIO服务端
- * @author 小路
- */
- public class NIOServer {
- //通道管理器
- private Selector selector;
- /**
- * 获得一个ServerSocket通道,并对该通道做一些初始化的工作
- * @param port 绑定的端口号
- * @throws IOException
- */
- public void initServer(int port) throws IOException {
- // 获得一个ServerSocket通道
- ServerSocketChannel serverChannel = ServerSocketChannel.open();
- // 设置通道为非阻塞
- serverChannel.configureBlocking(false);
- // 将该通道对应的ServerSocket绑定到port端口
- serverChannel.socket().bind(new InetSocketAddress(port));
- // 获得一个通道管理器
- this.selector = Selector.open();
- //将通道管理器和该通道绑定,并为该通道注册SelectionKey.OP_ACCEPT事件,注册该事件后,
- //当该事件到达时,selector.select()会返回,如果该事件没到达selector.select()会一直阻塞。
- serverChannel.register(selector, SelectionKey.OP_ACCEPT);
- }
- /**
- * 采用轮询的方式监听selector上是否有需要处理的事件,如果有,则进行处理
- * @throws IOException
- */
- @SuppressWarnings("unchecked")
- public void listen() throws IOException {
- System.out.println("服务端启动成功!");
- // 轮询访问selector
- while (true) {
- //当注册的事件到达时,方法返回;否则,该方法会一直阻塞
- selector.select();
- // 获得selector中选中的项的迭代器,选中的项为注册的事件
- Iterator ite = this.selector.selectedKeys().iterator();
- while (ite.hasNext()) {
- SelectionKey key = (SelectionKey) ite.next();
- // 删除已选的key,以防重复处理
- ite.remove();
- // 客户端请求连接事件
- if (key.isAcceptable()) {
- ServerSocketChannel server = (ServerSocketChannel) key
- .channel();
- // 获得和客户端连接的通道
- SocketChannel channel = server.accept();
- // 设置成非阻塞
- channel.configureBlocking(false);
- //在这里可以给客户端发送信息哦
- channel.write(ByteBuffer.wrap(new String("向客户端发送了一条信息").getBytes()));
- //在和客户端连接成功之后,为了可以接收到客户端的信息,需要给通道设置读的权限。
- channel.register(this.selector, SelectionKey.OP_READ);
- // 获得了可读的事件
- } else if (key.isReadable()) {
- read(key);
- }
- }
- }
- }
- /**
- * 处理读取客户端发来的信息 的事件
- * @param key
- * @throws IOException
- */
- public void read(SelectionKey key) throws IOException{
- // 服务器可读取消息:得到事件发生的Socket通道
- SocketChannel channel = (SocketChannel) key.channel();
- // 创建读取的缓冲区
- ByteBuffer buffer = ByteBuffer.allocate(10);
- channel.read(buffer);
- byte[] data = buffer.array();
- String msg = new String(data).trim();
- System.out.println("服务端收到信息:"+msg);
- ByteBuffer outBuffer = ByteBuffer.wrap(msg.getBytes());
- channel.write(outBuffer);// 将消息回送给客户端
- }
- /**
- * 启动服务端测试
- * @throws IOException
- */
- public static void main(String[] args) throws IOException {
- NIOServer server = new NIOServer();
- server.initServer(8000);
- server.listen();
- }
- }
客户端:
- package cn.nio;
- import java.io.IOException;
- import java.net.InetSocketAddress;
- import java.nio.ByteBuffer;
- import java.nio.channels.SelectionKey;
- import java.nio.channels.Selector;
- import java.nio.channels.SocketChannel;
- import java.util.Iterator;
- /**
- * NIO客户端
- * @author 小路
- */
- public class NIOClient {
- //通道管理器
- private Selector selector;
- /**
- * 获得一个Socket通道,并对该通道做一些初始化的工作
- * @param ip 连接的服务器的ip
- * @param port 连接的服务器的端口号
- * @throws IOException
- */
- public void initClient(String ip,int port) throws IOException {
- // 获得一个Socket通道
- SocketChannel channel = SocketChannel.open();
- // 设置通道为非阻塞
- channel.configureBlocking(false);
- // 获得一个通道管理器
- this.selector = Selector.open();
- // 客户端连接服务器,其实方法执行并没有实现连接,需要在listen()方法中调
- //用channel.finishConnect();才能完成连接
- channel.connect(new InetSocketAddress(ip,port));
- //将通道管理器和该通道绑定,并为该通道注册SelectionKey.OP_CONNECT事件。
- channel.register(selector, SelectionKey.OP_CONNECT);
- }
- /**
- * 采用轮询的方式监听selector上是否有需要处理的事件,如果有,则进行处理
- * @throws IOException
- */
- @SuppressWarnings("unchecked")
- public void listen() throws IOException {
- // 轮询访问selector
- while (true) {
- selector.select();
- // 获得selector中选中的项的迭代器
- Iterator ite = this.selector.selectedKeys().iterator();
- while (ite.hasNext()) {
- SelectionKey key = (SelectionKey) ite.next();
- // 删除已选的key,以防重复处理
- ite.remove();
- // 连接事件发生
- if (key.isConnectable()) {
- SocketChannel channel = (SocketChannel) key
- .channel();
- // 如果正在连接,则完成连接
- if(channel.isConnectionPending()){
- channel.finishConnect();
- }
- // 设置成非阻塞
- channel.configureBlocking(false);
- //在这里可以给服务端发送信息哦
- channel.write(ByteBuffer.wrap(new String("向服务端发送了一条信息").getBytes()));
- //在和服务端连接成功之后,为了可以接收到服务端的信息,需要给通道设置读的权限。
- channel.register(this.selector, SelectionKey.OP_READ);
- // 获得了可读的事件
- } else if (key.isReadable()) {
- read(key);
- }
- }
- }
- }
- /**
- * 处理读取服务端发来的信息 的事件
- * @param key
- * @throws IOException
- */
- public void read(SelectionKey key) throws IOException{
- //和服务端的read方法一样
- }
- /**
- * 启动客户端测试
- * @throws IOException
- */
- public static void main(String[] args) throws IOException {
- NIOClient client = new NIOClient();
- client.initClient("localhost",8000);
- client.listen();
- }
- }
小结:
终于把动态代理和java NIO分析完了,呵呵,下面就要分析hadoop的RPC机制源码了,博客地址:http://weixiaolu.iteye.com/blog/1504898 。不过如果对java NIO的理解存在异议的,欢迎一起讨论。
如需转载,请注明出处:http://weixiaolu.iteye.com/blog/1479656
NIO API 主要集中在 java.nio 和它的 subpackages 中:
java.nio
定义了 Buffer 及其数据类型相关的子类。其中被 java.nio.channels 中的类用来进行 IO 操作的 ByteBuffer 的作用非常重要。
java.nio.channels
定义了一系列处理 IO 的 Channel 接口以及这些接口在文件系统和网络通讯上的实现。通过 Selector 这个类,还提供了进行非阻塞 IO 操作的办法。这个包可以说是NIO API 的核心。
java.nio.channels.spi
定义了可用来实现 channel 和 selector API 的抽象类。
java.nio.charset
定义了处理字符编码和解码的类。
java.nio.charset.spi
定义了可用来实现 charset API 的抽象类。
java.nio.channels.spi 和 java.nio.charset.spi 这两个包主要被用来对现有 NIO API 进行扩展,在实际的使用中,我们一般只和另外的 3 个包打交道。下面将对这 3 个包一一介绍。
Package java.nio
这个包主要定义了 Buffer 及其子类。 Buffer 定义了一个线性存放 primitive type 数据的容器接口。对于除 boolean 以外的其他 primitive type ,都有一个相应的 Buffer子类, ByteBuffer 是其中最重要的一个子类。
下面这张 UML 类图描述了 java.nio 中的类的关系:
Buffer
定义了一个可以线性存放 primitive type 数据的容器接口。 Buffer 主要包含了与类型( byte, char… )无关的功能。值得注意的是 Buffer 及其子类都不是线程安全的。
每个 Buffer 都有以下的属性:
capacity
这个 Buffer 最多能放多少数据。 capacity 一般在 buffer 被创建的时候指定。
limit
在 Buffer 上进行的读写操作都不能越过这个下标。当写数据到 buffer 中时, limit 一般和 capacity 相等,当读数据时, limit 代表 buffer 中有效数据的长度。
position
读 / 写操作的当前下标。当使用 buffer 的相对位置进行读 / 写操作时,读 / 写会从这个下标进行,并在操作完成后, buffer 会更新下标的值。
mark
一个临时存放的位置下标。调用 mark() 会将 mark 设为当前的 position 的值,以后调用 reset() 会将 position 属性设置为 mark 的值。 mark 的值总是小于等于position 的值,如果将 position 的值设的比 mark 小,当前的 mark 值会被抛弃掉。
这些属性总是满足以下条件:
0 <= mark <= position <= limit <= capacity
limit 和 position 的值除了通过 limit() 和 position() 函数来设置,也可以通过下面这些函数来改变:
Buffer clear()
把 position 设为 0 ,把 limit 设为 capacity ,一般在把数据写入 Buffer 前调用。
Buffer flip()
把 limit 设为当前 position ,把 position 设为 0 ,一般在从 Buffer 读出数据前调用。
Buffer rewind()
把 position 设为 0 , limit 不变,一般在把数据重写入 Buffer 前调用。
Buffer 对象有可能是只读的,这时,任何对该对象的写操作都会触发一个 ReadOnlyBufferException 。 isReadOnly() 方法可以用来判断一个 Buffer 是否只读。
ByteBuffer
在 Buffer 的子类中, ByteBuffer 是一个地位较为特殊的类,因为在 java.io.channels 中定义的各种 channel 的 IO 操作基本上都是围绕 ByteBuffer 展开的。
ByteBuffer 定义了 4 个 static 方法来做创建工作:
ByteBuffer allocate(int capacity)
创建一个指定 capacity 的 ByteBuffer 。
ByteBuffer allocateDirect(int capacity)
创建一个 direct 的 ByteBuffer ,这样的 ByteBuffer 在参与 IO 操作时性能会更好(很有可能是在底层的实现使用了 DMA 技术),相应的,创建和回收 direct 的ByteBuffer 的代价也会高一些。 isDirect() 方法可以检查一个 buffer 是否是 direct 的。
ByteBuffer wrap(byte [] array)
ByteBuffer wrap(byte [] array, int offset, int length)
把一个 byte 数组或 byte 数组的一部分包装成 ByteBuffer 。
ByteBuffer 定义了一系列 get 和 put 操作来从中读写 byte 数据,如下面几个:
byte get()
ByteBuffer get(byte [] dst)
byte get(int index)
ByteBuffer put(byte b)
ByteBuffer put(byte [] src)
ByteBuffer put(int index, byte b)
这些操作可分为绝对定位和相对定为两种,相对定位的读写操作依靠 position 来定位 Buffer 中的位置,并在操作完成后会更新 position 的值。
在其它类型的 buffer 中,也定义了相同的函数来读写数据,唯一不同的就是一些参数和返回值的类型。
除了读写 byte 类型数据的函数, ByteBuffer 的一个特别之处是它还定义了读写其它 primitive 数据的方法,如:
int getInt()
从 ByteBuffer 中读出一个 int 值。
ByteBuffer putInt(int value)
写入一个 int 值到 ByteBuffer 中。
读写其它类型的数据牵涉到字节序问题, ByteBuffer 会按其字节序(大字节序或小字节序)写入或读出一个其它类型的数据( int,long… )。字节序可以用 order 方法来取得和设置:
ByteOrder order()
返回 ByteBuffer 的字节序。
ByteBuffer order(ByteOrder bo)
设置 ByteBuffer 的字节序。
ByteBuffer 另一个特别的地方是可以在它的基础上得到其它类型的 buffer 。如:
CharBuffer asCharBuffer()
为当前的 ByteBuffer 创建一个 CharBuffer 的视图。在该视图 buffer 中的读写操作会按照 ByteBuffer 的字节序作用到 ByteBuffer 中的数据上。
用这类方法创建出来的 buffer 会从 ByteBuffer 的 position 位置开始到 limit 位置结束,可以看作是这段数据的视图。视图 buffer 的 readOnly 属性和 direct 属性与ByteBuffer 的一致,而且也只有通过这种方法,才可以得到其他数据类型的 direct buffer 。
ByteOrder
用来表示 ByteBuffer 字节序的类,可将其看成 java 中的 enum 类型。主要定义了下面几个 static 方法和属性:
ByteOrder BIG_ENDIAN
代表大字节序的 ByteOrder 。
ByteOrder LITTLE_ENDIAN
代表小字节序的 ByteOrder 。
ByteOrder nativeOrder()
返回当前硬件平台的字节序。
MappedByteBuffer
ByteBuffer 的子类,是文件内容在内存中的映射。这个类的实例需要通过 FileChannel 的 map() 方法来创建。
接下来看看一个使用 ByteBuffer 的例子,这个例子从标准输入不停地读入字符,当读满一行后,将收集的字符写到标准输出:
public static void main(String [] args) throws IOException { // 创建一个 capacity 为 256 的 ByteBuffer ByteBuffer buf = ByteBuffer.allocate(256); while ( true ) { // 从标准输入流读入一个字符 int c = System.in.read(); // 当读到输入流结束时,退出循环 if (c == -1) break ;
// 把读入的字符写入 ByteBuffer 中 buf.put(( byte ) c); // 当读完一行时,输出收集的字符 if (c == '\n' ) { // 调用 flip() 使 limit 变为当前的 position 的值 ,position 变为 0, // 为接下来从 ByteBuffer 读取做准备 buf.flip(); // 构建一个 byte 数组 byte [] content = new byte [buf.limit()]; // 从 ByteBuffer 中读取数据到 byte 数组中 buf.get(content); // 把 byte 数组的内容写到标准输出 System.out.print( new String(content)); // 调用 clear() 使 position 变为 0,limit 变为 capacity 的值, // 为接下来写入数据到 ByteBuffer 中做准备 buf.clear(); } } } |
Package java.nio.channels
这个包定义了 Channel 的概念, Channel 表现了一个可以进行 IO 操作的通道(比如,通过 FileChannel ,我们可以对文件进行读写操作)。 java.nio.channels 包含了文件系统和网络通讯相关的 channel 类。这个包通过 Selector 和 SelectableChannel 这两个类,还定义了一个进行非阻塞( non-blocking ) IO 操作的 API ,这对需要高性能 IO 的应用非常重要。
下面这张 UML 类图描述了 java.nio.channels 中 interface 的关系:
Channel
Channel 表现了一个可以进行 IO 操作的通道,该 interface 定义了以下方法:
boolean isOpen()
该 Channel 是否是打开的。
void close()
关闭这个 Channel ,相关的资源会被释放。
ReadableByteChannel
定义了一个可从中读取 byte 数据的 channel interface 。
int read(ByteBuffer dst)
从 channel 中读取 byte 数据并写到 ByteBuffer 中。返回读取的 byte 数。
WritableByteChannel
定义了一个可向其写 byte 数据的 channel interface 。
int write(ByteBuffer src)
从 ByteBuffer 中读取 byte 数据并写到 channel 中。返回写出的 byte 数。
ByteChannel
ByteChannel 并没有定义新的方法,它的作用只是把 ReadableByteChannel 和 WritableByteChannel 合并在一起。
ScatteringByteChannel
继承了 ReadableByteChannel 并提供了同时往几个 ByteBuffer 中写数据的能力。
GatheringByteChannel
继承了 WritableByteChannel 并提供了同时从几个 ByteBuffer 中读数据的能力。
InterruptibleChannel
用来表现一个可以被异步关闭的 Channel 。这表现在两方面:
1. 当一个 InterruptibleChannel 的 close() 方法被调用时,其它 block 在这个 InterruptibleChannel 的 IO 操作上的线程会接收到一个AsynchronousCloseException 。
2. 当一个线程 block 在 InterruptibleChannel 的 IO 操作上时,另一个线程调用该线程的 interrupt() 方法会导致 channel 被关闭,该线程收到一个ClosedByInterruptException ,同时线程的 interrupt 状态会被设置。
接下来的这张 UML 类图描述了 java.nio.channels 中类的关系:
非阻塞 IO
非阻塞 IO 的支持可以算是 NIO API 中最重要的功能,非阻塞 IO 允许应用程序同时监控多个 channel 以提高性能,这一功能是通过 Selector , SelectableChannel 和SelectionKey 这 3 个类来实现的。
SelectableChannel 代表了可以支持非阻塞 IO 操作的 channel ,可以将其注册在 Selector 上,这种注册的关系由 SelectionKey 这个类来表现(见 UML 图)。 Selector这个类通过 select() 函数,给应用程序提供了一个可以同时监控多个 IO channel 的方法:
应用程序通过调用 select() 函数,让 Selector 监控注册在其上的多个 SelectableChannel ,当有 channel 的 IO 操作可以进行时, select() 方法就会返回以让应用程序检查 channel 的状态,并作相应的处理。
下面是 JDK 1.4 中非阻塞 IO 的一个例子,这段 code 使用了非阻塞 IO 实现了一个 time server :
private static void acceptConnections( int port) throws Exception { // 打开一个 Selector Selector acceptSelector = SelectorProvider.provider().openSelector();
// 创建一个 ServerSocketChannel ,这是一个 SelectableChannel 的子类 ServerSocketChannel ssc = ServerSocketChannel.open(); // 将其设为 non-blocking 状态,这样才能进行非阻塞 IO 操作 ssc.configureBlocking( false );
// 给 ServerSocketChannel 对应的 socket 绑定 IP 和端口 InetAddress lh = InetAddress.getLocalHost(); InetSocketAddress isa = new InetSocketAddress(lh, port); ssc.socket().bind(isa);
// 将 ServerSocketChannel 注册到 Selector 上,返回对应的 SelectionKey SelectionKey acceptKey = ssc.register(acceptSelector, SelectionKey.OP_ACCEPT);
int keysAdded = 0;
// 用 select() 函数来监控注册在 Selector 上的 SelectableChannel // 返回值代表了有多少 channel 可以进行 IO 操作 (ready for IO) while ((keysAdded = acceptSelector.select()) > 0) { // selectedKeys() 返回一个 SelectionKey 的集合, // 其中每个 SelectionKey 代表了一个可以进行 IO 操作的 channel 。 // 一个 ServerSocketChannel 可以进行 IO 操作意味着有新的 TCP 连接连入了 Set readyKeys = acceptSelector.selectedKeys(); Iterator i = readyKeys.iterator();
while (i.hasNext()) { SelectionKey sk = (SelectionKey) i.next(); // 需要将处理过的 key 从 selectedKeys 这个集合中删除 i.remove(); // 从 SelectionKey 得到对应的 channel ServerSocketChannel nextReady = (ServerSocketChannel) sk.channel(); // 接受新的 TCP 连接 Socket s = nextReady.accept().socket(); // 把当前的时间写到这个新的 TCP 连接中 PrintWriter out = new PrintWriter(s.getOutputStream(), true ); Date now = new Date(); out.println(now); // 关闭连接 out.close(); } } } |
这是个纯粹用于演示的例子,因为只有一个 ServerSocketChannel 需要监控,所以其实并不真的需要使用到非阻塞 IO 。不过正因为它的简单,可以很容易地看清楚非阻塞 IO 是如何工作的。
SelectableChannel
这个抽象类是所有支持非阻塞 IO 操作的 channel (如 DatagramChannel 、 SocketChannel )的父类。 SelectableChannel 可以注册到一个或多个 Selector 上以进行非阻塞 IO 操作。
SelectableChannel 可以是 blocking 和 non-blocking 模式(所有 channel 创建的时候都是 blocking 模式),只有 non-blocking 的 SelectableChannel 才可以参与非阻塞 IO 操作。
SelectableChannel configureBlocking(boolean block)
设置 blocking 模式。
boolean isBlocking()
返回 blocking 模式。
通过 register() 方法, SelectableChannel 可以注册到 Selector 上。
int validOps()
返回一个 bit mask ,表示这个 channel 上支持的 IO 操作。当前在 SelectionKey 中,用静态常量定义了 4 种 IO 操作的 bit 值: OP_ACCEPT , OP_CONNECT ,OP_READ 和 OP_WRITE 。
SelectionKey register(Selector sel, int ops)
将当前 channel 注册到一个 Selector 上并返回对应的 SelectionKey 。在这以后,通过调用 Selector 的 select() 函数就可以监控这个 channel 。 ops 这个参数是一个 bit mask ,代表了需要监控的 IO 操作。
SelectionKey register(Selector sel, int ops, Object att)
这个函数和上一个的意义一样,多出来的 att 参数会作为 attachment 被存放在返回的 SelectionKey 中,这在需要存放一些 session state 的时候非常有用。
boolean isRegistered()
该 channel 是否已注册在一个或多个 Selector 上。
SelectableChannel 还提供了得到对应 SelectionKey 的方法:
SelectionKey keyFor(Selector sel)
返回该 channe 在 Selector 上的注册关系所对应的 SelectionKey 。若无注册关系,返回 null 。
Selector
Selector 可以同时监控多个 SelectableChannel 的 IO 状况,是非阻塞 IO 的核心。
Selector open()
Selector 的一个静态方法,用于创建实例。
在一个 Selector 中,有 3 个 SelectionKey 的集合:
1. key set 代表了所有注册在这个 Selector 上的 channel ,这个集合可以通过 keys() 方法拿到。
2. Selected-key set 代表了所有通过 select() 方法监测到可以进行 IO 操作的 channel ,这个集合可以通过 selectedKeys() 拿到。
3. Cancelled-key set 代表了已经 cancel 了注册关系的 channel ,在下一个 select() 操作中,这些 channel 对应的 SelectionKey 会从 key set 和 cancelled-key set 中移走。这个集合无法直接访问。
以下是 select() 相关方法的说明:
int select()
监控所有注册的 channel ,当其中有注册的 IO 操作可以进行时,该函数返回,并将对应的 SelectionKey 加入 selected-key set 。
int select(long timeout)
可以设置超时的 select() 操作。
int selectNow()
进行一个立即返回的 select() 操作。
Selector wakeup()
使一个还未返回的 select() 操作立刻返回。
SelectionKey
代表了 Selector 和 SelectableChannel 的注册关系。
Selector 定义了 4 个静态常量来表示 4 种 IO 操作,这些常量可以进行位操作组合成一个 bit mask 。
int OP_ACCEPT
有新的网络连接可以 accept , ServerSocketChannel 支持这一非阻塞 IO 。
int OP_CONNECT
代表连接已经建立(或出错), SocketChannel 支持这一非阻塞 IO 。
int OP_READ
int OP_WRITE
代表了读、写操作。
以下是其主要方法:
Object attachment()
返回 SelectionKey 的 attachment , attachment 可以在注册 channel 的时候指定。
Object attach(Object ob)
设置 SelectionKey 的 attachment 。
SelectableChannel channel()
返回该 SelectionKey 对应的 channel 。
Selector selector()
返回该 SelectionKey 对应的 Selector 。
void cancel()
cancel 这个 SelectionKey 所对应的注册关系。
int interestOps()
返回代表需要 Selector 监控的 IO 操作的 bit mask 。
SelectionKey interestOps(int ops)
设置 interestOps 。
int readyOps()
返回一个 bit mask ,代表在相应 channel 上可以进行的 IO 操作。
ServerSocketChannel
支持非阻塞操作,对应于 java.net.ServerSocket 这个类,提供了 TCP 协议 IO 接口,支持 OP_ACCEPT 操作。
ServerSocket socket()
返回对应的 ServerSocket 对象。
SocketChannel accept()
接受一个连接,返回代表这个连接的 SocketChannel 对象。
SocketChannel
支持非阻塞操作,对应于 java.net.Socket 这个类,提供了 TCP 协议 IO 接口,支持 OP_CONNECT , OP_READ 和 OP_WRITE 操作。这个类还实现了 ByteChannel ,ScatteringByteChannel 和 GatheringByteChannel 接口。
DatagramChannel 和这个类比较相似,其对应于 java.net.DatagramSocket ,提供了 UDP 协议 IO 接口。
Socket socket()
返回对应的 Socket 对象。
boolean connect(SocketAddress remote)
boolean finishConnect()
connect() 进行一个连接操作。如果当前 SocketChannel 是 blocking 模式,这个函数会等到连接操作完成或错误发生才返回。如果当前 SocketChannel 是 non-blocking 模式,函数在连接能立刻被建立时返回 true ,否则函数返回 false ,应用程序需要在以后用 finishConnect() 方法来完成连接操作。
Pipe
包含了一个读和一个写的 channel(Pipe.SourceChannel 和 Pipe.SinkChannel) ,这对 channel 可以用于进程中的通讯。
FileChannel
用于对文件的读、写、映射、锁定等操作。和映射操作相关的类有 FileChannel.MapMode ,和锁定操作相关的类有 FileLock 。值得注意的是 FileChannel 并不支持非阻塞操作。
Channels
这个类提供了一系列 static 方法来支持 stream 类和 channel 类之间的互操作。这些方法可以将 channel 类包装为 stream 类,比如,将 ReadableByteChannel 包装为InputStream 或 Reader ;也可以将 stream 类包装为 channel 类,比如,将 OutputStream 包装为 WritableByteChannel 。
Package java.nio.charset
这个包定义了 Charset 及相应的 encoder 和 decoder 。下面这张 UML 类图描述了这个包中类的关系,可以将其中 Charset , CharsetDecoder 和 CharsetEncoder 理解成一个 Abstract Factory 模式的实现:
Charset
代表了一个字符集,同时提供了 factory method 来构建相应的 CharsetDecoder 和 CharsetEncoder 。
Charset 提供了以下 static 的方法:
SortedMap availableCharsets()
返回当前系统支持的所有 Charset 对象,用 charset 的名字作为 set 的 key 。
boolean isSupported(String charsetName)
判断该名字对应的字符集是否被当前系统支持。
Charset forName(String charsetName)
返回该名字对应的 Charset 对象。
Charset 中比较重要的方法有:
String name()
返回该字符集的规范名。
Set aliases()
返回该字符集的所有别名。
CharsetDecoder newDecoder()
创建一个对应于这个 Charset 的 decoder 。
CharsetEncoder newEncoder()
创建一个对应于这个 Charset 的 encoder 。
CharsetDecoder
将按某种字符集编码的字节流解码为 unicode 字符数据的引擎。
CharsetDecoder 的输入是 ByteBuffer ,输出是 CharBuffer 。进行 decode 操作时一般按如下步骤进行:
1. 调用 CharsetDecoder 的 reset() 方法。(第一次使用时可不调用)
2. 调用 decode() 方法 0 到 n 次,将 endOfInput 参数设为 false ,告诉 decoder 有可能还有新的数据送入。
3. 调用 decode() 方法最后一次,将 endOfInput 参数设为 true ,告诉 decoder 所有数据都已经送入。
4. 调用 decoder 的 flush() 方法。让 decoder 有机会把一些内部状态写到输出的 CharBuffer 中。
CharsetDecoder reset()
重置 decoder ,并清除 decoder 中的一些内部状态。
CoderResult decode(ByteBuffer in, CharBuffer out, boolean endOfInput)
从 ByteBuffer 类型的输入中 decode 尽可能多的字节,并将结果写到 CharBuffer 类型的输出中。根据 decode 的结果,可能返回 3 种 CoderResult :CoderResult.UNDERFLOW 表示已经没有输入可以 decode ; CoderResult.OVERFLOW 表示输出已满;其它的 CoderResult 表示 decode 过程中有错误发生。根据返回的结果,应用程序可以采取相应的措施,比如,增加输入,清除输出等等,然后再次调用 decode() 方法。
CoderResult flush(CharBuffer out)
有些 decoder 会在 decode 的过程中保留一些内部状态,调用这个方法让这些 decoder 有机会将这些内部状态写到输出的 CharBuffer 中。调用成功返回CoderResult.UNDERFLOW 。如果输出的空间不够,该函数返回 CoderResult.OVERFLOW ,这时应用程序应该扩大输出 CharBuffer 的空间,然后再次调用该方法。
CharBuffer decode(ByteBuffer in)
一个便捷的方法把 ByteBuffer 中的内容 decode 到一个新创建的 CharBuffer 中。在这个方法中包括了前面提到的 4 个步骤,所以不能和前 3 个函数一起使用。
decode 过程中的错误有两种: malformed-input CoderResult 表示输入中数据有误; unmappable-character CoderResult 表示输入中有数据无法被解码成 unicode的字符。如何处理 decode 过程中的错误取决于 decoder 的设置。对于这两种错误, decoder 可以通过 CodingErrorAction 设置成:
1. 忽略错误
2. 报告错误。(这会导致错误发生时, decode() 方法返回一个表示该错误的 CoderResult 。)
3. 替换错误,用 decoder 中的替换字串替换掉有错误的部分。
CodingErrorAction malformedInputAction()
返回 malformed-input 的出错处理。
CharsetDecoder onMalformedInput(CodingErrorAction newAction)
设置 malformed-input 的出错处理。
CodingErrorAction unmappableCharacterAction()
返回 unmappable-character 的出错处理。
CharsetDecoder onUnmappableCharacter(CodingErrorAction newAction)
设置 unmappable-character 的出错处理。
String replacement()
返回 decoder 的替换字串。
CharsetDecoder replaceWith(String newReplacement)
设置 decoder 的替换字串。
CharsetEncoder
将 unicode 字符数据编码为特定字符集的字节流的引擎。其接口和 CharsetDecoder 相类似。
CoderResult
描述 encode/decode 操作结果的类。
CodeResult 包含两个 static 成员:
CoderResult OVERFLOW
表示输出已满
CoderResult UNDERFLOW
表示输入已无数据可用。
其主要的成员函数有:
boolean isError()
boolean isMalformed()
boolean isUnmappable()
boolean isOverflow()
boolean isUnderflow()
用于判断该 CoderResult 描述的错误。
int length()
返回错误的长度,比如,无法被转换成 unicode 的字节长度。
void throwException()
抛出一个和这个 CoderResult 相对应的 exception 。
CodingErrorAction
表示 encoder/decoder 中错误处理方法的类。可将其看成一个 enum 类型。有以下 static 属性:
CodingErrorAction IGNORE
忽略错误。
CodingErrorAction REPLACE
用替换字串替换有错误的部分。
CodingErrorAction REPORT
报告错误,对于不同的函数,有可能是返回一个和错误有关的 CoderResult ,也有可能是抛出一个 CharacterCodingException 。
Java NIO非堵塞应用通常适用用在I/O读写等方面,我们知道,系统运行的性能瓶颈通常在I/O读写,包括对端口和文件的操作上,过去,在打开一个I/O通道后,read()将一直等待在端口一边读取字节内容,如果没有内容进来,read()也是傻傻的等,这会影响我们程序继续做其他事情,那么改进做法就是开设线程,让线程去等待,但是这样做也是相当耗费资源的。
Java NIO非堵塞技术实际是采取Reactor模式,或者说是Observer模式为我们监察I/O端口,如果有内容进来,会自动通知我们,这样,我们就不必开启多个线程死等,从外界看,实现了流畅的I/O读写,不堵塞了。
Java NIO出现不只是一个技术性能的提高,你会发现网络上到处在介绍它,因为它具有里程碑意义,从JDK1.4开始,Java开始提高性能相关的功能,从而使得Java在底层或者并行分布式计算等操作上已经可以和C或Perl等语言并驾齐驱。
如果你至今还是在怀疑Java的性能,说明你的思想和观念已经完全落伍了,Java一两年就应该用新的名词来定义。从JDK1.5开始又要提供关于线程、并发等新性能的支持,Java应用在游戏等适时领域方面的机会已经成熟,Java在稳定自己中间件地位后,开始蚕食传统C的领域。
本文主要简单介绍NIO的基本原理,在下一篇文章中,将结合Reactor模式和著名线程大师Doug Lea的一篇文章深入讨论。
NIO主要原理和适用。
NIO 有一个主要的类Selector,这个类似一个观察者,只要我们把需要探知的socketchannel告诉Selector,我们接着做别的事情,当有事件发生时,他会通知我们,传回一组SelectionKey,我们读取这些Key,就会获得我们刚刚注册过的socketchannel,然后,我们从这个Channel中读取数据,放心,包准能够读到,接着我们可以处理这些数据。
Selector内部原理实际是在做一个对所注册的channel的轮询访问,不断的轮询(目前就这一个算法),一旦轮询到一个channel有所注册的事情发生,比如数据来了,他就会站起来报告,交出一把钥匙,让我们通过这把钥匙来读取这个channel的内容。
了解了这个基本原理,我们结合代码看看使用,在使用上,也在分两个方向,一个是线程处理,一个是用非线程,后者比较简单,看下面代码:
import java.io.*; import java.nio.*; import java.nio.channels.*; import java.nio.channels.spi.*; import java.net.*; import java.util.*; /** public class NBTest {
public void startServer() throws Exception //向Selector注册Channel及我们有兴趣的事件 while(true) //不断的轮询 }
s = "Att: " + (sk.attachment() == null ? "no" : "yes");
}
|
这是一个守候在端口9000的noblock server例子,如果我们编制一个客户端程序,就可以对它进行互动操作,或者使用telnet 主机名 90000 可以链接上。
通过仔细阅读这个例程,相信你已经大致了解NIO的原理和使用方法,下一篇,我们将使用多线程来处理这些数据,再搭建一个自己的Reactor模式。
一、 前言
- Buffer:包含数据且用于读写的线形表结构。其中还提供了一个特殊类用于内存映射文件的I/O操作。
- Charset:它提供Unicode字符串影射到字节序列以及逆映射的操作。
- Channels:包含socket,file和pipe三种管道,都是全双工的通道。
- Selector:多个异步I/O操作集中到一个或多个线程中(可以被看成是Unix中select()函数的面向对象版本)。
二、 故事开始 : 让C++程序员写Java程序!
三、 开始调查 : 怎么Java这么“傻”!
四、 继续调查 : 如此跨平台
五、 迷惑不解 : 为什么要自己消耗资源?
六、 它山之石 : 从Apache的Mina框架了解Selector
七、 真相大白 : 可爱的Java你太不容易了
八、 后记
No-Block 和Block IO 的区别:
一个典型的网络通讯步骤为: open (新建socket Chanel )--> connect( 尝试建立连接) --> accept( 连接被接受) --> read( 读取请求) send (输出结果)--> close( 连接关闭) 。
对于一个No-Block 的网络IO ,上面的每一步都是会马上返回的,当然返回的结果可能为null ,可能不为null ,这个要看下上文(context )决定。一般情况下,我们都是需要不为null 的结果,这个就需要我们在适当的时机,执行适当的步骤,这样就会得到我们想要的结果。何为适当的时机?这个下面会讲。
对于一个block 的网络IO ,上面的每一步执行的时候,如果没到适当的时机,当前线程就会被block 住,直到适当的时机,返回给你确定的结果。
当然对与No-Block 或者Block IO ,上面的每一步都有可能会抛出IOException 异常的。
NIO 编程接触的几个关键概念:
Buffer :是一块连续的内存块,是 NIO 数据读或写的中转地。Buffer 这篇blog 暂时略过不讲。
Chanel :数据的源头或者数据的目的地,用于向 buffer 提供数据或者读取 buffer 数据 ,异步 I/O 支持。
注意chanel 有2 类,一种叫SocketChanel, 一种叫ServerSocketChanel ,看名字我们就知道,一类是普通的socket chanel ,client 端和服务器端都用的,一类是专门用在server 端的。当然这个界限也不是绝对的,互为client 和server 的情况也是存在的。
Selector : chanel 事件的侦听者, 它能检测一个或多个通道 (channel) 上的事件,并将事件分发出去。使用一个select 线程就能监听多个通道上的事件,并基于事件驱动触发相应的响应。
SelectionKey : chanel 上发生的事件, 包含了事件的状态信息和时间以及对应的 chanel 。
Chanel 的状态:
可连( Connectable ):当一个 Chanel 完成 socket 连接操作已完成或者已失败放弃时
能连( Acceptable ):当一个 Chanel 已经准备好接受一个新的 socket 连接时
可读( Readable ):当一个 Chanel 能被读时
可写( Writable ):当一个 Chanel 能被写时
结合对照上面的网络通讯步骤我们可以有以下推导出的结论:
当一个 Server Chanel 是 Connectable 时, client 端尝试 connect 才会成功。
当一个 Server Chanel 是 Acceptable 时, client 的连接请求被真正受理,一个新的 chanel 会被生成,并且记录了 localAdrress 和 remoteAddress. 为进一步读写做准备。
当一个 Chanel 是 Readable 时,我们从这个 Chanel 中读取数据才会成功。
当一个 Chanel 是 Writable 时,我们往这个 Chanel 中写数据才会成功。
记住一点,对于一个 No-Block 的 Chanel 来说,上面 4 个操作都会马上返回或者抛出 IOException ,但是是不是成功就难说了,前面就说了,我们在一个 Chanel 做操作的时候,我们要密切关注 Chanel 的当前状态。只有在知道 Chanel 的当前状态时,我们才能在这个 Chanel 上做最适当的操作。
聪明的你可能马上就会想到,要是你操作的 Chanel 的状态的转换信息能被你抓取,这些问题就迎刃而解了。对啦, NIO 就是这样设计的。一个 Chanel 可以注册一个 Selector (就像一个事件侦听器),而且你还要告知你想要要侦听的状态。用一段代码来说明下:
selector = SelectorProvider.provider().openSelector();
serverChannel1 = ServerSocketChannel.open();
serverChannel1.configureBlocking(false);
InetSocketAddress isa = new InetSocketAddress("localhost", 9999);
serverChannel1.socket().bind(isa);
serverChannel1.register(selector, SelectionKey.OP_ACCEPT);
这段代码的意思就是我们打开了一个 ServerChanel ,侦听本机的 9999 端口,并且新建了一个 Selector, 然后这个 ServerChanel 注册了这个 Selector ,并且指定了它感兴趣的状态类型是 OP_ACCEPT. 这样有什么效果呢?
注意红色那句,这句意思是selector要求serverChannel1状态为acceptable的时候把这个消息告诉selector.
效果就是:
当这个 ServerChanel 状态为 Acceptable 时, Selector 就会收到一个消息,这个消息当然就是一个 SelectionKey对象。调用 Selector 的 selectedKeys ()方法,我们就能得到所有 Chanel 发送过来的消息。
因为 SelectionKey 包含 事件的状态,时间以及对应的 Chanel ,很自然的,我们遍历这个 Set<SelectionKey>, 根据 SelectionKey 的状态,就能在相应的 Chanel 做正确的操作。比如,能读的时候我们就读,能写的时候我们就写。
最后讲讲 Server 端和 Client 编程的一般步骤:
对于 Client 来一般是这样的:
InetSocketAddress isa = new InetSocketAddress(host, port);
SocketChannel sc = null;
sc = SocketChannel.open();
sc.connect(isa);
sc.write(data);
…
Sc.read(buff);
构造一个 InetSocketAddress 对象 --> open --> connect --> write --> read
注意这里用的不是 No-Block 的方式,因为 client 如果没有得到 server 端的正确回应的话就采取下一步操作无疑是没有意义的。
Server 端:
selector = SelectorProvider.provider ().openSelector();
serverChannel = ServerSocketChannel.open ();
serverChannel .configureBlocking( false );
InetSocketAddress isa = new InetSocketAddress( "localhost" , 9999 );
serverChannel .socket().bind(isa);
serverChannel .register( selector , SelectionKey. OP_ACCEPT );
构造一个 Selector --> 打开一个 serverSocketChanel --> 设定 serverSocketChanel 为 no-block--> bind serverSocketChanel 到一个 host 和 port --> register Selector 并告知感兴趣的状态类型转换。
在 SelectionKey Set 上遍历操作:
while (true) {
selector.select();
Iterator selectedKeys = this.selector.selectedKeys().iterator();
while (selectedKeys.hasNext()) {
SelectionKey key = (SelectionKey) selectedKeys.next();
selectedKeys.remove();
if (!key.isValid()) {
continue;
}
if (key.isAcceptable()) {
accept(key);
} else if (key.isReadable()) {
read(key);
} else if (key.isWritable()) {
write(key);
}
}
}
在这个循环里面我们会根据 SelectionKey 的状态,采取不同的操作的。当连接被 accepted 时, 一个新的 chanel会被生成,并且记录了 localAdrress 和 remoteAddress. 为进一步读写做准备。 accept 函数如下:
public void accept(SelectionKey key) throws IOException {
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
SocketChanel socketChannel1 = serverSocketChannel.accept();
socketChannel1.configureBlocking(false);
socketChannel1.register(selector, SelectionKey.OP_READ);
}
这里新的 Chanel 被构建,最后同样会注册到 selector , 同时要求当这个 Chanel 为 Readable 时,一个SelectionKey 被放入到 Selector 中。这样上面循环会用 read(key) 来处理这个 SelectionKey.
JDK 1.4 中引入的新输入输出 (NIO) 库在标准 Java 代码中提供了高速的、面向块的 I/O。本实用教程从高级概念到底层的编程细节,非常详细地介绍了 NIO 库。您将学到诸如缓冲区和通道这样的关键 I/O 元素的知识,并考察更新后的库中的标准 I/O 是如何工作的。您还将了解只能通过 NIO 来完成的工作,如异步 I/O 和直接缓冲区。
◆ 输入/输出:概念性描述
I/O 简介
I/O ? 或者输入/输出 ? 指的是计算机与外部世界或者一个程序与计算机的其余部分的之间的接口。它对于任何计算机系统都非常关键,因而所有 I/O 的主体实际上是内置在操作系统中的。单独的程序一般是让系统为它们完成大部分的工作。
在 Java 编程中,直到最近一直使用 流 的方式完成 I/O。所有 I/O 都被视为单个的字节的移动,通过一个称为 Stream 的对象一次移动一个字节。流 I/O 用于与外部世界接触。它也在内部使用,用于将对象转换为字节,然后再转换回对象。
NIO 与原来的 I/O 有同样的作用和目的,但是它使用不同的方式? 块 I/O。正如您将在本教程中学到的,块 I/O 的效率可以比流 I/O 高许多。
为什么要使用 NIO?
NIO 的创建目的是为了让 Java 程序员可以实现高速 I/O 而无需编写自定义的本机代码。NIO 将最耗时的 I/O 操作(即填充和提取缓冲区)转移回操作系统,因而可以极大地提高速度。
流与块的比较
原来的 I/O 库(在 java.io.*中) 与 NIO 最重要的区别是数据打包和传输的方式。正如前面提到的,原来的 I/O 以流的方式处理数据,而 NIO 以块的方式处理数据。
面向流 的 I/O 系统一次一个字节地处理数据。一个输入流产生一个字节的数据,一个输出流消费一个字节的数据。为流式数据创建过滤器非常容易。链接几个过滤器,以便每个过滤器只负责单个复杂处理机制的一部分,这样也是相对简单的。不利的一面是,面向流的 I/O 通常相当慢。
一个 面向块 的 I/O 系统以块的形式处理数据。每一个操作都在一步中产生或者消费一个数据块。按块处理数据比按(流式的)字节处理数据要快得多。但是面向块的 I/O 缺少一些面向流的 I/O 所具有的优雅性和简单性。
集成的 I/O
在 JDK 1.4 中原来的 I/O 包和 NIO 已经很好地集成了。 java.io.* 已经以 NIO 为基础重新实现了,所以现在它可以利用 NIO 的一些特性。例如, java.io.* 包中的一些类包含以块的形式读写数据的方法,这使得即使在更面向流的系统中,处理速度也会更快。
也可以用 NIO 库实现标准 I/O 功能。例如,可以容易地使用块 I/O 一次一个字节地移动数据。但是正如您会看到的,NIO 还提供了原 I/O 包中所没有的许多好处。
◆ 通道和缓冲区
概 述
通道 和 缓冲区 是 NIO 中的核心对象,几乎在每一个 I/O 操作中都要使用它们。
通道是对原 I/O 包中的流的模拟。到任何目的地(或来自任何地方)的所有数据都必须通过一个 Channel 对象。一个 Buffer 实质上是一个容器对象。发送给一个通道的所有对象都必须首先放到缓冲区中;同样地,从通道中读取的任何数据都要读到缓冲区中。
在本节中,您会了解到 NIO 中通道和缓冲区是如何工作的。
什么是缓冲区?
Buffer 是一个对象, 它包含一些要写入或者刚读出的数据。 在 NIO 中加入 Buffer 对象,体现了新库与原 I/O 的一个重要区别。在面向流的 I/O 中,您将数据直接写入或者将数据直接读到 Stream 对象中。
在 NIO 库中,所有数据都是用缓冲区处理的。在读取数据时,它是直接读到缓冲区中的。在写入数据时,它是写入到缓冲区中的。任何时候访问 NIO 中的数据,您都是将它放到缓冲区中。
缓冲区实质上是一个数组。通常它是一个字节数组,但是也可以使用其他种类的数组。但是一个缓冲区不 仅仅 是一个数组。缓冲区提供了对数据的结构化访问,而且还可以跟踪系统的读/写进程。
缓冲区类型
最常用的缓冲区类型是 ByteBuffer。一个 ByteBuffer 可以在其底层字节数组上进行 get/set 操作(即字节的获取和设置)。
ByteBuffer 不是 NIO 中唯一的缓冲区类型。事实上,对于每一种基本 Java 类型都有一种缓冲区类型:
• ByteBuffer
• CharBuffer
• ShortBuffer
• IntBuffer
• LongBuffer
• FloatBuffer
• DoubleBuffer
每一个 Buffer 类都是 Buffer 接口的一个实例。 除了 ByteBuffer,每一个 Buffer 类都有完全一样的操作,只是它们所处理的数据类型不一样。因为大多数标准 I/O 操作都使用 ByteBuffer,所以它具有所有共享的缓冲区操作以及一些特有的操作。
现在您可以花一点时间运行 UseFloatBuffer.java,它包含了类型化的缓冲区的一个应用例子。
什么是通道?
Channel是一个对象,可以通过它读取和写入数据。拿 NIO 与原来的 I/O 做个比较,通道就像是流。
正如前面提到的,所有数据都通过 Buffer 对象来处理。您永远不会将字节直接写入通道中,相反,您是将数据写入包含一个或者多个字节的缓冲区。同样,您不会直接从通道中读取字节,而是将数据从通道读入缓冲区,再从缓冲区获取这个字节。
通道类型
通道与流的不同之处在于通道是双向的。而流只是在一个方向上移动(一个流必须是 InputStream 或者 OutputStream 的子类), 而 通道 可以用于读、写或者同时用于读写。
因为它们是双向的,所以通道可以比流更好地反映底层操作系统的真实情况。特别是在 UNIX 模型中,底层操作系统通道是双向的。
◆ 从理论到实践:NIO 中的读和写
概 述
读和写是 I/O 的基本过程。从一个通道中读取很简单:只需创建一个缓冲区,然后让通道将数据读到这个缓冲区中。写入也相当简单:创建一个缓冲区,用数据填充它,然后让通道用这些数据来执行写入操作。
在本节中,我们将学习有关在 Java 程序中读取和写入数据的一些知识。我们将回顾 NIO 的主要组件(缓冲区、通道和一些相关的方法),看看它们是如何交互以进行读写的。在接下来的几节中,我们将更详细地分析这其中的每个组件以及其交互。
从文件中读取
在我们第一个练习中,我们将从一个文件中读取一些数据。如果使用原来的 I/O,那么我们只需创建一个 FileInputStream 并从它那里读取。而在 NIO 中,情况稍有不同:我们首先从 FileInputStream 获取一个 FileInputStream 对象,然后使用这个通道来读取数据。
在 NIO 系统中,任何时候执行一个读操作,您都是从通道中读取,但是您不是 直接 从通道读取。因为所有数据最终都驻留在缓冲区中,所以您是从通道读到缓冲区中。
因此读取文件涉及三个步骤:(1) 从 FileInputStream 获取 Channel,(2) 创建 Buffer,(3) 将数据从 Channel 读到 Buffer 中。
现在,让我们看一下这个过程。
三个容易的步骤
第一步是获取通道。我们从 FileInputStream 获取通道:
FileInputStream fin = new FileInputStream( "readandshow.txt" );
FileChannel fc = fin.getChannel();
下一步是创建缓冲区:
ByteBuffer buffer = ByteBuffer.allocate( 1024 );
最后,需要将数据从通道读到缓冲区中,如下所示:
fc.read( buffer );
您会注意到,我们不需要告诉通道要读 多少数据 到缓冲区中。每一个缓冲区都有复杂的内部统计机制,它会跟踪已经读了多少数据以及还有多少空间可以容纳更多的数据
写入文件
在 NIO 中写入文件类似于从文件中读取。首先从 FileOutputStream 获取一个通道:
FileOutputStream fout = new FileOutputStream( "writesomebytes.txt" );
FileChannel fc = fout.getChannel();
下一步是创建一个缓冲区并在其中放入一些数据 - 在这里,数据将从一个名为 message 的数组中取出,这个数组包含字符串 "Some bytes" 的 ASCII 字节(本教程后面将会解释 buffer.flip() 和 buffer.put() 调用)。
ByteBuffer buffer = ByteBuffer.allocate( 1024 );
for (int i=0; i
buffer.put( message[i] );
}
buffer.flip();
最后一步是写入缓冲区中
fc.write( buffer );
注意在这里同样不需要告诉通道要写入多数据。缓冲区的内部统计机制会跟踪它包含多少数据以及还有多少数据要写入。
读写结合
下面我们将看一下在结合读和写时会有什么情况。我们以一个名为 CopyFile.java 的简单程序作为这个练习的基础,它将一个文件的所有内容拷贝到另一个文件中。CopyFile.java 执行三个基本操作:首先创建一个 Buffer,然后从源文件中将数据读到这个缓冲区中,然后将缓冲区写入目标文件。这个程序不断重复 ― 读、写、读、写 ― 直到源文件结束。
CopyFile 程序让您看到我们如何检查操作的状态,以及如何使用 clear() 和 flip() 方法重设缓冲区,并准备缓冲区以便将新读取的数据写到另一个通道中。
运行 CopyFile 例子
因为缓冲区会跟踪它自己的数据,所以 CopyFile 程序的内部循环 (inner loop) 非常简单,如下所示:
fcin.read( buffer );
fcout.write( buffer );
第一行将数据从输入通道 fcin 中读入缓冲区,第二行将这些数据写到输出通道 fcout 。
检查状态
下一步是检查拷贝何时完成。当没有更多的数据时,拷贝就算完成,并且可以在 read() 方法返回 -1 是判断这一点,如下所示:
int r = fcin.read( buffer );
if (r==-1) {
break;
}
重设缓冲区
最后,在从输入通道读入缓冲区之前,我们调用 clear() 方法。同样,在将缓冲区写入输出通道之前,我们调用 flip() 方法,如下所示
buffer.clear();int r = fcin.read( buffer );
if (r==-1) {
break;
}
buffer.flip();
fcout.write( buffer );
clear() 方法重设缓冲区,使它可以接受读入的数据。 flip() 方法让缓冲区可以将新读入的数据写入另一个通道。
Selector 的出现,大大改善了多个 Java Socket的效率。在没有NIO的时候,轮询多个socket是通过read阻塞来完成,即使是非阻塞模式,我们在轮询socket是否就绪的时候依然需要使用系统调用。而Selector的出现,把就绪选择交给了操作系统(我们熟知的selec函数),把就绪判断和读取数据分开,不仅性能上大有改善,而且使得代码上更加清晰。
Java NIO的选择器部分,实际上有三个重要的类。
1,Selector 选择器,完成主要的选择功能。select(), 并保存有注册到他上面的通道集合。
2,SelectableChannel 可被注册到Selector上的通道。
3,SelectionKey 描述一个Selector和SelectableChannel的关系。并保存有通道所关心的操作。
接下来,便是一个通用的流程。
首先, 创建选择器,
然后,注册通道,
其次,选择就绪通道,
最后,处理已就绪通道数据。
让我们通过代码来看这些步骤是如何完成的。
- Selector selector = Selector.open();
- channel1.configureBlocking(false);
- channel2.configureBlocking(false);
- cahnnel3.configureBlocking(false);
- SelectionKey key1 = channel1.register(selector, SelectionKey.OP_READ);
- SelectionKey key2 = channel2.register(selector, SelectionKey.OP_WRITE|SelectionKey.OP_READ);
- SelectionKey key3 = channel3.register(selector, SelectionKey.OP_WRITE);
- while(true){
- int readyCount = selector.select(1000);
- if( readyCount == 0) continue;
- Iterator<SelectionKey> iter = selector.selectedKeys.iterator();
- while(iter.hasNext()){
- SelectionKey key = iter.next();
- if( key.isReadable()){
- readData(key);
- }
- iter.remove();
- }
- }
上面的代码是一个示例。我们可以看到,创建一个Selector使用open方法,这是一个静态工厂模式,注意他的异常处理是IOException。接下来的通道,我们并没有说明是什么通道,一般来说,基本上Socket类通道是可选择的,但是文件类的是不可选择的。
我们可以看到的是,这个通道调用了 configureBlocking(false)这样的方法,在注册到Selector上之前,通道应该保证是非阻塞的,否则异常IllegalBlockingModeException抛出。
之后我们开始注册通道,使用registor方法,主意后面一个参数,如果对一个只读的通道注册写操作,是会抛出异常IllegalArgumentException的。例如SocketChannel不支持accept操作。这里一共有四种操作 read,write,accept,connect。
当然,我们还不能把已经关闭的通道注册到Selector中,而Selector如果调用close,那么试图访问它的大多数操作都会抛出异常。
接下来,我们开始使用select函数更新selectedKey,这里比较复杂,但是从代码看,我们做完select以后,就开始便利selectedKey,找到符合要求的key,进行读数据操作。这里还要注意的是,使用完key以后,需要从selectedKey集合中删除。
下面我们还有更详细的说明,因为我们还不知道这个select到底做了说明,selectedKey又是如何更新的呢?
首先,一个selectionKey 包含了两个集合,一个是 注册的感兴趣的操作集合,一个是已经准备好的集合。第一个集合基本上是注册就确定的,或者通过interestOps(int)来改变。select是不会改变interest集合的。但是select改变的是 ready集合。也就是准备好的感兴趣的操作的集合,这样说,也说明,ready集合实际上是interest集合的子集。
如何使用这些集合呢?
看代码:
- if (( key.readyOps() & SelectionKey.OP_READ) != 0)
- {
- myBuffer.clear();
- key.channel().read(myBuffer);
- doSomething(myBuffer.flip());
- }
从上面的代码看出,这个集合只是一个掩码,需要和操作与,才能得到结果。
当然,也有更方便的用法。
- if ( key.isReadable() )
还要注意的是,这样的判断并不是就是一定的,只是一个提示。底层通道随时在改变。
对于SelectionKey, 还可以执行cancel操作,一个被cancel掉的SelectionKey,实际上只是被放到了Selector的cancel键集合里,键马上失效,但是通道依然是注册状态,要等到下一个select时才真正取消注册。
现在,我们再来看看选择器做了什么。选择器是就绪选择的核心,它包含了注册到它上面的通道与操作关系的Key,它维护了三个集合。
1,已经注册的键集合 调用, keys()
2,已经选择的键集合 调用, selectedKeys()
3,已经取消的键集合 私有。
选择器虽然封装了select,poll等底层的系统调用,但是她有自己的一套来管理这些键。
每当select被调用时,她做如下检查:
1,检查已经取消的键的集合。如果非空,从其他两个集合中移除已经取消的键,注销相关通道,清空已经取消的键的集合。
2,已注册的键的集合中的键的interest集合被检查。例如有新的interest的操作注册。但是这一步不会影响后面的操作。这是延时到下一次select调用时才会影响的。
就绪条件确认后,底层系统进行查询。依赖于select方法的参数,如果没有通道准备好,根select带的参数超时设置,可能会阻塞线程。
系统调用完成后,可以对操作系统指示的已经准备好的interest集合中的一种操作的通道,执行以下操作:
a: 如果通道的键还没有在已经选择的键的集合中,那么键的ready集合将被清空。然后表示操作系统发现的当前通道已经准备好的操作的比特掩码将被设置。
b: 否则,一旦通道的键被放入已经选择的键的集合中时,ready集合不会被清除,而是累积。这就是说,如果之前的状态是ready的操作,本次已经不是ready了,但是他的bit位依然表示是ready,不会被清除。
3, 步骤2可能会有很长一段时间的休眠。所以在步骤2完成以后,步骤1继续执行以确保被取消的键正确处理。
4,返回值,select的返回值说明的是从上一次调用到本次调用,就绪选择的个数。如果上一次就已经是就绪的,那么本次不统计。这是是为何返回为0时,我们continue的原因。
这里使用的延迟注销方法,正是为了解决注销键的问题。如果线程在取消键的同时进行通道注销,那么很可能阻塞并与正在进行的选择操作发生冲突。
同样我们有3中select可以选择:
1, select()
2, select(long timeout)
3, selectNow();
select()会阻塞线程知道又一个通道就绪。
而select带timeout的会在特定时间内阻塞,或者至少有一个通道就绪。
而selectNow()如果没有发现就绪,就直接返回。
如何停止中断选择呢?
有三种方法。
1, wakeup()这是一种优雅的方法,同时也是延时的。如果当前没有正进行的选择操作,也就是要等到下一个select才起作用。
2, close()选择器的close被调用,则所有在选择操作中阻塞的线程被唤醒,相关通道被注销,键也被取消。
3, interrupt() 实际上interrupt并不会中断线程。而是设置线程中断标志。
然后依然是调用wakeup()。这是因为 Selector 捕获了interruptedException,然后在异常处理中调用了 wakeup()
根据以上的信息,我们可以了解到,实际上选择器对选择键中的集合的操作,是交给程序员来完成的。如何管理选择键,是很关键的。
这里需要记住的是,ready集合中的比特位,是累积的。根据步骤2,如果一个键是在选择集合中,那么这个键的ready集合是不会被清除的。而如果这个键不在选择集合中,那么就要首先清空这个键的ready集合,然后把就绪信息更新到这个ready集合上,最后,就是把这个键加入到已选择的集合中。
这也是为什么上面的流程中,我们为什么要把处理的键删除,因为如果不删除,下一次的信息是累积的,我们就不能分出本次select中那些操作就绪了。如果清除掉,那么下一次如果就绪,ready集合就是重置后更新的信息。
前些时候花了一些时间在研究java.nio的api使用机制,看了好久,也觉得不习惯它的使用方式和用法.毕竟自己对C语言了解太少,也不太了解C语言在网络编程上的用法。对这种底层下的编程太不习惯,还是应该好好了解下底层的东西,要不然就光会使用别人的东西,如果是自己写一个,就写不出来了。
从java1.4以来,java nio就出现在java的api中,在日常的使用当中,基本上都是围绕着java.io中的几个inputStream(outputStream)和reader(writer)在转,要想编写一些其他形式的调用,还真不会。我也看了下最新的springframework中的FileCopyUtils中的代码,也是将各种操作集合给java.io来做。好像java.nio用得不是很多。看了下java.nio的描述信息,感觉这是用在网络编程上的。比如文件下载服务,通信服务等地方。自己暂时还用不上网络上的编程,不过等到用的时候还去学,就太晚了。
看了下关于Selector的使用方法,官方的说法是一个“多路复用器”,从我自己的角度来讲就感觉像一个服务总线,负责统一各个处理程序的消息注册,统一接收客户端或服务器消息信息,再分发给不同的事件处理程序进行处理。整个流程就一个注册->接收->处理的过程,从使用者的角度来讲,直接使用这些api还不太成熟,毕竟这些api都太底层了,需要了解太多的技术细节,也不太适合像我这种不了解C语言网络编程的人。这周花了三天的时候专门研究了下整个java.nio包,重点看了下关于Selector的运用(datagram和pipe还不太会用),结合了网络上的很多例子(尤其是《java nio》这本书上的例子),对selector总算有了很大的认识,对底层的io编程也有了新的了解。
写了个模拟下载的例子,服务器端模拟一个拥有整个硬盘资源的处理程序。客户端通过发送要下载的文件(通过完整文件路径),从而实现由服务器写文件到客户端,客户端保存接收的整体流程。其中,仅涉及到了数据传输的基本运用,即没有运用到网络编程上的urlConnection,也没有用到专门的socket,客户端也没有实现一个文件多线程下载的机制。仅仅作为一个selector的下载练习使用(当然,如果要求不高,也可以用到实际编程的)。
服务器端基本思路就是打开链接,绑定端口,接收信息,处理信息。详细过程如下:
第一步:创建服务器端socketChannel,并绑定指定端口,注册到selector上。
- selector = Selector.open();
- ServerSocketChannel serverSocketChannel=ServerSocketChannel.open();
- serverSocketChannel.configureBlocking(false);
- serverSocketChannel.socket().bind(new InetSocketAddress(1234));
- serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
都是标准的步骤,先open,再配置block为异步的,服务器socket绑定本机端口,注册到selector上,并指定key为ACCEPT。
第二步:接收消息,处理信息咯。
- for(; ;) {
- selector.select();
- Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();
- while(keyIterator.hasNext()) {
- SelectionKey key = keyIterator.next();
- if(key.isValid())
- handle(key);
- keyIterator.remove();
- }
- }
这也是标准步骤,先进行select,再获得selectedKey,迭代,处理,再remove掉。
在网上,看到有些例子中,对selector.select()中返回的值进行判断,如果返回为o则continue,在我这个程序中,经测试当selector.select()返回为0时,而selector.selectedKeys()确不为0,这样就没有处理信息了。从官方doc上来看,关于select()的返回值解释为“已更新其准备就绪操作集的键的数目,该数目可能为零”,即这个数目指已更新的键集,故在处理中可能键集没有更新,而选择的消息处理keys却不为0,这种情况是正确的。不清楚是不是这个意思,还望高人来解释一下。
第三步:就是handle方法了,处理消息事件。
- if(key.isAcceptable()) {
- ServerSocketChannel channel = (ServerSocketChannel) key.channel();
- SocketChannel socketChannel = channel.accept();
- socketChannel.configureBlocking(false);
- socketChannel.register(selector, SelectionKey.OP_READ);//注册读事件
- map.put(socketChannel, new Handle());//把socket和handle进行绑定
- }
- //用map中的handle处理read和write事件,以模拟多个文件同时进行下载
- if(key.isReadable() || key.isWritable()) {
- SocketChannel socketChannel = (SocketChannel) key.channel();
- final Handle handle = map.get(socketChannel);
- if(handle != null)
- handle.handle(key);
- }
在以上方法中,我在主方法中仅处理appcet事件,再为每个连接到的socketChannel注册读事件,再在读消息处理中注册写事件。而读和写消息处理,我用了一个内部类来处理,即每个内部类来绑定一个socketChannel,单独处理每个socketChannel。这样的处理,是满足客户端对服务器端发起多个请求,来下载不同的文件,这样服务器端就可为不同的客户端socketChannel定制不同的处理程序了。内部类的定义如下:
- private class Handle{
- private StringBuilder message;
- private boolean writeOK = true;
- private ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
- private FileChannel fileChannel;
- private String fileName;
- }
message指由客户端发送的信息,在此定义此信息为客户端请求的文件信息。由message来得到服务器端的文件名路径信息,并保存到fileName中,fileChannel即为由此fileName取得的channel。byteBuffer就是用来写数据的字节数据缓冲器了。
handle单独处理读和写事件,在读事件中,解析文件名,并注册写事件,代码如下:
- if(key.isReadable()) {
- SocketChannel socketChannel = (SocketChannel) key.channel();
- if(writeOK)
- message = new StringBuilder();
- while(true) {
- byteBuffer.clear();
- int r = socketChannel.read(byteBuffer);
- if(r == 0)
- break;
- if(r == -1) {
- socketChannel.close();
- key.cancel();
- return;
- }
- message.append(new String(byteBuffer.array(), 0, r));
- }
- //将接收到的信息转化成文件名,以映射到服务器上的指定文件
- if(writeOK && invokeMessage(message)) {
- socketChannel.register(selector, SelectionKey.OP_WRITE);
- writeOK = false;
- }
- }
以上代码就主要是读信息,并解析信息成一个文件名,并注册写事件了。当然还处理客户端断开连接事件,读到信息为-1时,断开连接。其中处理文件信息代码如下:
- String m = message.toString();
- try {
- File f = new File(m);
- if(!f.exists())
- return false;
- fileName = m;
- return true;
- } catch(Exception e) {
- return false;
- }
其中就是将message转化成一个fileName,以供在写的时候能够从fileName中取得fileChannel,此方法保存fileName是存在的。
下面看写事件的处理:
- //向客户端写数据
- if(key.isWritable()) {
- if(!key.isValid())
- return;
- SocketChannel socketChannel = (SocketChannel) key.channel();
- if(fileChannel == null)
- fileChannel = new FileInputStream(fileName).getChannel();
- byteBuffer.clear();
- int w = fileChannel.read(byteBuffer);
- //如果文件已写完,则关掉key和socket
- if(w <= 0) {
- fileName = null;
- fileChannel.close();
- fileChannel = null;
- writeOK = true;
- socketChannel.close();
- key.channel();
- return;
- }
- byteBuffer.flip();
- socketChannel.write(byteBuffer);
- }
写处理中,主要就是打开本地的文件channel将fileChannel中的数据写到socketChannel中,如果数据已经写完毕,则关掉相应channel。
至此,服务器端的信息就处理完毕,运行这个程序就只需要在main方法中,new().call()就可以了。当然,有服务器端还需要客户端才行,客户端信息请参照下一笔记。
服务器端代码随附件中。
- 容 量(Capacity ): 容量描述了这个缓冲区最 多能够存放多少,也是Buffer的最大存储元素量,这个值是在创建Buffer的时候指定的,而且不可以更改
- 限 制(Limit ): 不能够进行读写的缓冲区 的第一个元素,换句话说就是这个Buffer里面的活动元素数量
- 位 置(Position ): 下一个需要进行读写的元 素的索引,当Buffer缓冲区调用相对get()和set()方法的时候会自动更新Position的值
- 标记( Mark ): 一个可记忆的 Position位置的值,当调用mark()方法的时候会执行mark = position,一旦调用reset()的时候就执行position = mark,和Position有点不一样,除非进行设置,否则Mark值是不存在的。
- 两 个对象应该是 同类型 的,Buffer包含了 不同的数据类型就绝对不可能相等
- 两 个Buffer对象position到limit之间的元素数量(remaining返回值)相同,两个Buffer的 容量可以不一样 ,而且两个Buffer的 索引位置也可以不一样 ,但是Buffer的remaining(从 position到limit)方法返回值必须是相同的
- 从remaining段的出示位置到结束位置里面的每一个元素都 必 须相 同
本文简介: JDK 1.4 中引入的新输入输出 (NIO) 库在标准 Java 代码中提供了高速的、面向块的 I/O。本实用教程从高级概念到底层的编程细节,非常详细地介绍了 NIO 库。您将学到诸如缓冲区和通道这样的关键 I/O 元素的知识,并考察更新后的库中的标准 I/O 是如何工作的。您还将了解只能通过 NIO 来完成的工作,如异步 I/O 和直接缓冲区。
输入/输出:概念性描述
I/O ? 或者输入/输出 ? 指的是计算机与外部世界或者一个程序与计算机的其余部分的之间的接口。它对于任何计算机系统都非常关键,因而所有 I/O 的主体实际上是内置在操作系统中的。单独的程序一般是让系统为它们完成大部分的工作。
在 Java 编程中,直到最近一直使用 流 的方式完成 I/O。所有 I/O 都被视为单个的字节的移动,通过一个称为 Stream 的对象一次移动一个字节。流 I/O 用于与外部世界接触。它也在内部使用,用于将对象转换为字节,然后再转换回对象。
NIO 与原来的 I/O 有同样的作用和目的,但是它使用不同的方式? 块 I/O。正如您将在本教程中学到的,块 I/O 的效率可以比流 I/O 高许多。
NIO 的创建目的是为了让 Java 程序员可以实现高速 I/O 而无需编写自定义的本机代码。NIO 将最耗时的 I/O 操作(即填充和提取缓冲区)转移回操作系统,因而可以极大地提高速度。
原来的 I/O 库(在 java.io.*
中) 与 NIO 最重要的区别是数据打包和传输的方式。正如前面提到的,原来的 I/O 以流的方式处理数据,而 NIO 以块的方式处理数据。
面向流 的 I/O 系统一次一个字节地处理数据。一个输入流产生一个字节的数据,一个输出流消费一个字节的数据。为流式数据创建过滤器非常容易。链接几个过滤器,以便每个过滤器只负责单个复杂处理机制的一部分,这样也是相对简单的。不利的一面是,面向流的 I/O 通常相当慢。
一个 面向块 的 I/O 系统以块的形式处理数据。每一个操作都在一步中产生或者消费一个数据块。按块处理数据比按(流式的)字节处理数据要快得多。但是面向块的 I/O 缺少一些面向流的 I/O 所具有的优雅性和简单性。
在 JDK 1.4 中原来的 I/O 包和 NIO 已经很好地集成了。 java.io.*
已经以 NIO 为基础重新实现了,所以现在它可以利用 NIO 的一些特性。例如, java.io.*
包中的一些类包含以块的形式读写数据的方法,这使得即使在更面向流的系统中,处理速度也会更快。
也可以用 NIO 库实现标准 I/O 功能。例如,可以容易地使用块 I/O 一次一个字节地移动数据。但是正如您会看到的,NIO 还提供了原 I/O 包中所没有的许多好处。
通道和缓冲区
通道
和 缓冲区
是 NIO 中的核心对象,几乎在每一个 I/O 操作中都要使用它们。
通道是对原 I/O 包中的流的模拟。到任何目的地(或来自任何地方)的所有数据都必须通过一个 Channel 对象。一个 Buffer 实质上是一个容器对象。发送给一个通道的所有对象都必须首先放到缓冲区中;同样地,从通道中读取的任何数据都要读到缓冲区中。
在本节中,您会了解到 NIO 中通道和缓冲区是如何工作的。
Buffer
是一个对象, 它包含一些要写入或者刚读出的数据。 在 NIO 中加入 Buffer
对象,体现了新库与原 I/O 的一个重要区别。在面向流的 I/O 中,您将数据直接写入或者将数据直接读到 Stream
对象中。
在 NIO 库中,所有数据都是用缓冲区处理的。在读取数据时,它是直接读到缓冲区中的。在写入数据时,它是写入到缓冲区中的。任何时候访问 NIO 中的数据,您都是将它放到缓冲区中。
缓冲区实质上是一个数组。通常它是一个字节数组,但是也可以使用其他种类的数组。但是一个缓冲区不 仅仅 是一个数组。缓冲区提供了对数据的结构化访问,而且还可以跟踪系统的读/写进程。
最常用的缓冲区类型是 ByteBuffer
。一个 ByteBuffer
可以在其底层字节数组上进行 get/set 操作(即字节的获取和设置)。
ByteBuffer
不是 NIO 中唯一的缓冲区类型。事实上,对于每一种基本 Java 类型都有一种缓冲区类型:
ByteBuffer
CharBuffer
ShortBuffer
IntBuffer
LongBuffer
FloatBuffer
DoubleBuffer
每一个 Buffer
类都是 Buffer
接口的一个实例。 除了 ByteBuffer
,每一个 Buffer 类都有完全一样的操作,只是它们所处理的数据类型不一样。因为大多数标准 I/O 操作都使用 ByteBuffer
,所以它具有所有共享的缓冲区操作以及一些特有的操作。
现在您可以花一点时间运行 UseFloatBuffer.java,它包含了类型化的缓冲区的一个应用例子。
Channel
是一个对象,可以通过它读取和写入数据。拿 NIO 与原来的 I/O 做个比较,通道就像是流。
正如前面提到的,所有数据都通过 Buffer
对象来处理。您永远不会将字节直接写入通道中,相反,您是将数据写入包含一个或者多个字节的缓冲区。同样,您不会直接从通道中读取字节,而是将数据从通道读入缓冲区,再从缓冲区获取这个字节。
通道与流的不同之处在于通道是双向的。而流只是在一个方向上移动(一个流必须是 InputStream
或者 OutputStream
的子类), 而 通道
可以用于读、写或者同时用于读写。
因为它们是双向的,所以通道可以比流更好地反映底层操作系统的真实情况。特别是在 UNIX 模型中,底层操作系统通道是双向的。
从理论到实践:NIO 中的读和写
读和写是 I/O 的基本过程。从一个通道中读取很简单:只需创建一个缓冲区,然后让通道将数据读到这个缓冲区中。写入也相当简单:创建一个缓冲区,用数据填充它,然后让通道用这些数据来执行写入操作。
在本节中,我们将学习有关在 Java 程序中读取和写入数据的一些知识。我们将回顾 NIO 的主要组件(缓冲区、通道和一些相关的方法),看看它们是如何交互以进行读写的。在接下来的几节中,我们将更详细地分析这其中的每个组件以及其交互。
在我们第一个练习中,我们将从一个文件中读取一些数据。如果使用原来的 I/O,那么我们只需创建一个 FileInputStream
并从它那里读取。而在 NIO 中,情况稍有不同:我们首先从 FileInputStream
获取一个 FileInputStream
对象,然后使用这个通道来读取数据。
在 NIO 系统中,任何时候执行一个读操作,您都是从通道中读取,但是您不是 直接 从通道读取。因为所有数据最终都驻留在缓冲区中,所以您是从通道读到缓冲区中。
因此读取文件涉及三个步骤:(1) 从 FileInputStream
获取 Channel
,(2) 创建 Buffer
,(3) 将数据从 Channel
读到 Buffer
中。
现在,让我们看一下这个过程。
第一步是获取通道。我们从 FileInputStream
获取通道:
FileInputStream fin = new FileInputStream( "readandshow.txt" );
FileChannel fc = fin.getChannel(); |
下一步是创建缓冲区:
1
|
ByteBuffer buffer = ByteBuffer.allocate( 1024 );
|
最后,需要将数据从通道读到缓冲区中,如下所示:
fc.read( buffer ); |
您会注意到,我们不需要告诉通道要读 多少数据 到缓冲区中。每一个缓冲区都有复杂的内部统计机制,它会跟踪已经读了多少数据以及还有多少空间可以容纳更多的数据
在 NIO 中写入文件类似于从文件中读取。首先从 FileOutputStream
获取一个通道:
1
2
|
FileOutputStream fout = new FileOutputStream( "writesomebytes.txt" );
FileChannel fc = fout.getChannel(); |
下一步是创建一个缓冲区并在其中放入一些数据 - 在这里,数据将从一个名为 message
的数组中取出,这个数组包含字符串 "Some bytes" 的 ASCII 字节(本教程后面将会解释 buffer.flip()
和 buffer.put()
调用)。
ByteBuffer buffer = ByteBuffer.allocate( 1024 );
for ( int i= 0 ; i<message.length; ++i) {
buffer.put( message[i] );
} buffer.flip(); |
最后一步是写入缓冲区中
fc.write( buffer ); |
注意在这里同样不需要告诉通道要写入多数据。缓冲区的内部统计机制会跟踪它包含多少数据以及还有多少数据要写入。
下面我们将看一下在结合读和写时会有什么情况。我们以一个名为 CopyFile.java 的简单程序作为这个练习的基础,它将一个文件的所有内容拷贝到另一个文件中。CopyFile.java 执行三个基本操作:首先创建一个 Buffer
,然后从源文件中将数据读到这个缓冲区中,然后将缓冲区写入目标文件。这个程序不断重复 ― 读、写、读、写 ― 直到源文件结束。
CopyFile 程序让您看到我们如何检查操作的状态,以及如何使用 clear()
和 flip()
方法重设缓冲区,并准备缓冲区以便将新读取的数据写到另一个通道中。
因为缓冲区会跟踪它自己的数据,所以 CopyFile 程序的内部循环 (inner loop) 非常简单,如下所示:
fcin.read( buffer ); fcout.write( buffer ); |
第一行将数据从输入通道 fcin
中读入缓冲区,第二行将这些数据写到输出通道 fcout
。
下一步是检查拷贝何时完成。当没有更多的数据时,拷贝就算完成,并且可以在 read()
方法返回 -1 是判断这一点,如下所示:
int r = fcin.read( buffer );
if (r==- 1 ) {
break ;
} |
最后,在从输入通道读入缓冲区之前,我们调用 clear()
方法。同样,在将缓冲区写入输出通道之前,我们调用 flip()
方法,如下所示
1
2
3
4
5
6
7
8
9
|
buffer.clear(); int r = fcin.read( buffer );
if (r==- 1 ) {
break ;
} buffer.flip(); fcout.write( buffer ); |
clear()
方法重设缓冲区,使它可以接受读入的数据。 flip()
方法让缓冲区可以将新读入的数据写入另一个通道。
缓冲区内部细节
本节将介绍 NIO 中两个重要的缓冲区组件:状态变量和访问方法 (accessor)。
状态变量是前一节中提到的"内部统计机制"的关键。每一个读/写操作都会改变缓冲区的状态。通过记录和跟踪这些变化,缓冲区就可能够内部地管理自己的资源。
在从通道读取数据时,数据被放入到缓冲区。在有些情况下,可以将这个缓冲区直接写入另一个通道,但是在一般情况下,您还需要查看数据。这是使用 访问方法 get()
来完成的。同样,如果要将原始数据放入缓冲区中,就要使用访问方法 put()
。
在本节中,您将学习关于 NIO 中的状态变量和访问方法的内容。我们将描述每一个组件,并让您有机会看到它的实际应用。虽然 NIO 的内部统计机制初看起来可能很复杂,但是您很快就会看到大部分的实际工作都已经替您完成了。您可能习惯于通过手工编码进行簿记 ― 即使用字节数组和索引变量,现在它已在 NIO 中内部地处理了。
可以用三个值指定缓冲区在任意时刻的状态:
position
limit
capacity
这三个变量一起可以跟踪缓冲区的状态和它所包含的数据。我们将在下面的小节中详细分析每一个变量,还要介绍它们如何适应典型的读/写(输入/输出)进程。在这个例子中,我们假定要将数据从一个输入通道拷贝到一个输出通道。
您可以回想一下,缓冲区实际上就是美化了的数组。在从通道读取时,您将所读取的数据放到底层的数组中。 position
变量跟踪已经写了多少数据。更准确地说,它指定了下一个字节将放到数组的哪一个元素中。因此,如果您从通道中读三个字节到缓冲区中,那么缓冲区的 position
将会设置为3,指向数组中第四个元素。
同样,在写入通道时,您是从缓冲区中获取数据。 position
值跟踪从缓冲区中获取了多少数据。更准确地说,它指定下一个字节来自数组的哪一个元素。因此如果从缓冲区写了5个字节到通道中,那么缓冲区的 position
将被设置为5,指向数组的第六个元素。
limit
变量表明还有多少数据需要取出(在从缓冲区写入通道时),或者还有多少空间可以放入数据(在从通道读入缓冲区时)。
position
总是小于或者等于 limit
。
缓冲区的 capacity
表明可以储存在缓冲区中的最大数据容量。实际上,它指定了底层数组的大小 ― 或者至少是指定了准许我们使用的底层数组的容量。
limit
决不能大于 capacity
。
我们首先观察一个新创建的缓冲区。出于本例子的需要,我们假设这个缓冲区的 总容量
为8个字节。 Buffer
的状态如下所示:
回想一下 ,limit
决不能大于 capacity
,此例中这两个值都被设置为 8。我们通过将它们指向数组的尾部之后(如果有第8个槽,则是第8个槽所在的位置)来说明这点。
position
设置为0。如果我们读一些数据到缓冲区中,那么下一个读取的数据就进入 slot 0 。如果我们从缓冲区写一些数据,从缓冲区读取的下一个字节就来自 slot 0 。 position
设置如下所示:
由于 capacity
不会改变,所以我们在下面的讨论中可以忽略它。
现在我们可以开始在新创建的缓冲区上进行读/写操作。首先从输入通道中读一些数据到缓冲区中。第一次读取得到三个字节。它们被放到数组中从 position
开始的位置,这时 position 被设置为 0。读完之后,position 就增加到 3,如下所示:
limit
没有改变。
在第二次读取时,我们从输入通道读取另外两个字节到缓冲区中。这两个字节储存在由 position
所指定的位置上, position
因而增加 2:
limit
没有改变。
现在我们要将数据写到输出通道中。在这之前,我们必须调用 flip()
方法。这个方法做两件非常重要的事:
- 它将
limit
设置为当前position
。 - 它将
position
设置为 0。
前一小节中的图显示了在 flip 之前缓冲区的情况。下面是在 flip 之后的缓冲区:
我们现在可以将数据从缓冲区写入通道了。 position
被设置为 0,这意味着我们得到的下一个字节是第一个字节。 limit
已被设置为原来的 position
,这意味着它包括以前读到的所有字节,并且一个字节也不多。
在第一次写入时,我们从缓冲区中取四个字节并将它们写入输出通道。这使得 position
增加到 4,而 limit
不变,如下所示:
我们只剩下一个字节可写了。 limit
在我们调用 flip()
时被设置为 5,并且 position
不能超过 limit
。所以最后一次写入操作从缓冲区取出一个字节并将它写入输出通道。这使得 position
增加到 5,并保持 limit
不变,如下所示:
最后一步是调用缓冲区的 clear()
方法。这个方法重设缓冲区以便接收更多的字节。 Clear
做两种非常重要的事情:
- 它将
limit
设置为与capacity
相同。 - 它设置
position
为 0。
下图显示了在调用 clear()
后缓冲区的状态:
缓冲区现在可以接收新的数据了。
到目前为止,我们只是使用缓冲区将数据从一个通道转移到另一个通道。然而,程序经常需要直接处理数据。例如,您可能需要将用户数据保存到磁盘。在这种情况下,您必须将这些数据直接放入缓冲区,然后用通道将缓冲区写入磁盘。
或者,您可能想要从磁盘读取用户数据。在这种情况下,您要将数据从通道读到缓冲区中,然后检查缓冲区中的数据。
在本节的最后,我们将详细分析如何使用 ByteBuffer
类的 get()
和 put()
方法直接访问缓冲区中的数据。
ByteBuffer
类中有四个 get()
方法:
byte get();
ByteBuffer get( byte dst[] );
ByteBuffer get( byte dst[], int offset, int length );
byte get( int index );
第一个方法获取单个字节。第二和第三个方法将一组字节读到一个数组中。第四个方法从缓冲区中的特定位置获取字节。那些返回ByteBuffer
的方法只是返回调用它们的缓冲区的 this
值。
此外,我们认为前三个 get()
方法是相对的,而最后一个方法是绝对的。 相对 意味着 get()
操作服从 limit
和 position
值 ― 更明确地说,字节是从当前 position
读取的,而 position
在 get
之后会增加。另一方面,一个 绝对 方法会忽略 limit
和 position
值,也不会影响它们。事实上,它完全绕过了缓冲区的统计方法。
上面列出的方法对应于 ByteBuffer
类。其他类有等价的 get()
方法,这些方法除了不是处理字节外,其它方面是是完全一样的,它们处理的是与该缓冲区类相适应的类型。
ByteBuffer
类中有五个 put()
方法:
ByteBuffer put( byte b );
ByteBuffer put( byte src[] );
ByteBuffer put( byte src[], int offset, int length );
ByteBuffer put( ByteBuffer src );
ByteBuffer put( int index, byte b );
第一个方法 写入(put)
单个字节。第二和第三个方法写入来自一个数组的一组字节。第四个方法将数据从一个给定的源ByteBuffer
写入这个 ByteBuffer
。第五个方法将字节写入缓冲区中特定的 位置
。那些返回 ByteBuffer
的方法只是返回调用它们的缓冲区的 this
值。
与 get()
方法一样,我们将把 put()
方法划分为 相对 或者 绝对 的。前四个方法是相对的,而第五个方法是绝对的。
上面显示的方法对应于 ByteBuffer
类。其他类有等价的 put()
方法,这些方法除了不是处理字节之外,其它方面是完全一样的。它们处理的是与该缓冲区类相适应的类型。
除了前些小节中描述的 get()
和 put()
方法, ByteBuffer
还有用于读写不同类型的值的其他方法,如下所示:
getByte()
getChar()
getShort()
getInt()
getLong()
getFloat()
getDouble()
putByte()
putChar()
putShort()
putInt()
putLong()
putFloat()
putDouble()
事实上,这其中的每个方法都有两种类型 ― 一种是相对的,另一种是绝对的。它们对于读取格式化的二进制数据(如图像文件的头部)很有用。
您可以在例子程序 TypesInByteBuffer.java 中看到这些方法的实际应用。
下面的内部循环概括了使用缓冲区将数据从输入通道拷贝到输出通道的过程。
while ( true ) {
buffer.clear();
int r = fcin.read( buffer );
if (r==- 1 ) {
break ;
}
buffer.flip();
fcout.write( buffer );
} |
read()
和 write()
调用得到了极大的简化,因为许多工作细节都由缓冲区完成了。 clear()
和 flip()
方法用于让缓冲区在读和写之间切换。
关于缓冲区的更多内容
到目前为止,您已经学习了使用缓冲区进行日常工作所需要掌握的大部分内容。我们的例子没怎么超出标准的读/写过程种类,在原来的 I/O 中可以像在 NIO 中一样容易地实现这样的标准读写过程。
本节将讨论使用缓冲区的一些更复杂的方面,比如缓冲区分配、包装和分片。我们还会讨论 NIO 带给 Java 平台的一些新功能。您将学到如何创建不同类型的缓冲区以达到不同的目的,如可保护数据不被修改的 只读 缓冲区,和直接映射到底层操作系统缓冲区的 直接 缓冲区。我们将在本节的最后介绍如何在 NIO 中创建内存映射文件。
在能够读和写之前,必须有一个缓冲区。要创建缓冲区,您必须 分配 它。我们使用静态方法 allocate()
来分配缓冲区:
ByteBuffer buffer = ByteBuffer.allocate( 1024 );
|
allocate()
方法分配一个具有指定大小的底层数组,并将它包装到一个缓冲区对象中 ― 在本例中是一个 ByteBuffer
。
您还可以将一个现有的数组转换为缓冲区,如下所示:
byte array[] = new byte [ 1024 ];
ByteBuffer buffer = ByteBuffer.wrap( array ); |
本例使用了 wrap()
方法将一个数组包装为缓冲区。必须非常小心地进行这类操作。一旦完成包装,底层数据就可以通过缓冲区或者直接访问。
slice()
方法根据现有的缓冲区创建一种 子缓冲区 。也就是说,它创建一个新的缓冲区,新缓冲区与原来的缓冲区的一部分共享数据。
使用例子可以最好地说明这点。让我们首先创建一个长度为 10 的 ByteBuffer
:
ByteBuffer buffer = ByteBuffer.allocate( 10 )
|
然后使用数据来填充这个缓冲区,在第 n 个槽中放入数字 n:
for ( int i= 0 ; i<buffer.capacity(); ++i) {
buffer.put( ( byte )i );
} |
现在我们对这个缓冲区 分片 ,以创建一个包含槽 3 到槽 6 的子缓冲区。在某种意义上,子缓冲区就像原来的缓冲区中的一个 窗口 。
窗口的起始和结束位置通过设置 position
和 limit
值来指定,然后调用 Buffer
的 slice()
方法:
buffer.position( 3 );
buffer.limit( 7 );
ByteBuffer slice = buffer.slice(); |
片
是缓冲区的 子缓冲区
。不过, 片段
和 缓冲区
共享同一个底层数据数组,我们在下一节将会看到这一点。
我们已经创建了原缓冲区的子缓冲区,并且我们知道缓冲区和子缓冲区共享同一个底层数据数组。让我们看看这意味着什么。
我们遍历子缓冲区,将每一个元素乘以 11 来改变它。例如,5 会变成 55。
for ( int i= 0 ; i<slice.capacity(); ++i) {
byte b = slice.get( i );
b *= 11 ;
slice.put( i, b );
} |
最后,再看一下原缓冲区中的内容:
buffer.position( 0 );
buffer.limit( buffer.capacity() ); while (buffer.remaining()> 0 ) {
System.out.println( buffer.get() );
} |
结果表明只有在子缓冲区窗口中的元素被改变了:
$ java SliceBuffer 0 1 2 33 44 55 66 7 8 9 |
缓冲区片对于促进抽象非常有帮助。可以编写自己的函数处理整个缓冲区,而且如果想要将这个过程应用于子缓冲区上,您只需取主缓冲区的一个片,并将它传递给您的函数。这比编写自己的函数来取额外的参数以指定要对缓冲区的哪一部分进行操作更容易。
只读缓冲区非常简单 ― 您可以读取它们,但是不能向它们写入。可以通过调用缓冲区的 asReadOnlyBuffer()
方法,将任何常规缓冲区转换为只读缓冲区,这个方法返回一个与原缓冲区完全相同的缓冲区(并与其共享数据),只不过它是只读的。
只读缓冲区对于保护数据很有用。在将缓冲区传递给某个对象的方法时,您无法知道这个方法是否会修改缓冲区中的数据。创建一个只读的缓冲区可以 保证 该缓冲区不会被修改。
不能将只读的缓冲区转换为可写的缓冲区。
另一种有用的 ByteBuffer
是直接缓冲区。 直接缓冲区 是为加快 I/O 速度,而以一种特殊的方式分配其内存的缓冲区。
实际上,直接缓冲区的准确定义是与实现相关的。Sun 的文档是这样描述直接缓冲区的:
给定一个直接字节缓冲区,Java 虚拟机将尽最大努力直接对它执行本机 I/O 操作。也就是说,它会在每一次调用底层操作系统的本机 I/O 操作之前(或之后),尝试避免将缓冲区的内容拷贝到一个中间缓冲区中(或者从一个中间缓冲区中拷贝数据)。
您可以在例子程序 FastCopyFile.java 中看到直接缓冲区的实际应用,这个程序是 CopyFile.java 的另一个版本,它使用了直接缓冲区以提高速度。
还可以用内存映射文件创建直接缓冲区。
内存映射文件 I/O 是一种读和写文件数据的方法,它可以比常规的基于流或者基于通道的 I/O 快得多。
内存映射文件 I/O 是通过使文件中的数据神奇般地出现为内存数组的内容来完成的。这其初听起来似乎不过就是将整个文件读到内存中,但是事实上并不是这样。一般来说,只有文件中实际读取或者写入的部分才会送入(或者 映射 )到内存中。
内存映射并不真的神奇或者多么不寻常。现代操作系统一般根据需要将文件的部分映射为内存的部分,从而实现文件系统。Java 内存映射机制不过是在底层操作系统中可以采用这种机制时,提供了对该机制的访问。
尽管创建内存映射文件相当简单,但是向它写入可能是危险的。仅只是改变数组的单个元素这样的简单操作,就可能会直接修改磁盘上的文件。修改数据与将数据保存到磁盘是没有分开的。
了解内存映射的最好方法是使用例子。在下面的例子中,我们要将一个 FileChannel
(它的全部或者部分)映射到内存中。为此我们将使用 FileChannel.map()
方法。下面代码行将文件的前 1024 个字节映射到内存中:
MappedByteBuffer mbb = fc.map( FileChannel.MapMode.READ_WRITE, 0 , 1024 );
|
map()
方法返回一个 MappedByteBuffer
,它是 ByteBuffer
的子类。因此,您可以像使用其他任何 ByteBuffer
一样使用新映射的缓冲区,操作系统会在需要时负责执行行映射。
分散和聚集
分散/聚集 I/O 是使用多个而不是单个缓冲区来保存数据的读写方法。
一个分散的读取就像一个常规通道读取,只不过它是将数据读到一个缓冲区数组中而不是读到单个缓冲区中。同样地,一个聚集写入是向缓冲区数组而不是向单个缓冲区写入数据。
分散/聚集 I/O 对于将数据流划分为单独的部分很有用,这有助于实现复杂的数据格式。
通道可以有选择地实现两个新的接口: ScatteringByteChannel
和 GatheringByteChannel
。一个 ScatteringByteChannel
是一个具有两个附加读方法的通道:
long read( ByteBuffer[] dsts );
long read( ByteBuffer[] dsts, int offset, int length );
这些 long read()
方法很像标准的 read
方法,只不过它们不是取单个缓冲区而是取一个缓冲区数组。
在 分散读取 中,通道依次填充每个缓冲区。填满一个缓冲区后,它就开始填充下一个。在某种意义上,缓冲区数组就像一个大缓冲区。
分散/聚集 I/O 对于将数据划分为几个部分很有用。例如,您可能在编写一个使用消息对象的网络应用程序,每一个消息被划分为固定长度的头部和固定长度的正文。您可以创建一个刚好可以容纳头部的缓冲区和另一个刚好可以容难正文的缓冲区。当您将它们放入一个数组中并使用分散读取来向它们读入消息时,头部和正文将整齐地划分到这两个缓冲区中。
我们从缓冲区所得到的方便性对于缓冲区数组同样有效。因为每一个缓冲区都跟踪自己还可以接受多少数据,所以分散读取会自动找到有空间接受数据的第一个缓冲区。在这个缓冲区填满后,它就会移动到下一个缓冲区。
聚集写入 类似于分散读取,只不过是用来写入。它也有接受缓冲区数组的方法:
long write( ByteBuffer[] srcs );
long write( ByteBuffer[] srcs, int offset, int length );
聚集写对于把一组单独的缓冲区中组成单个数据流很有用。为了与上面的消息例子保持一致,您可以使用聚集写入来自动将网络消息的各个部分组装为单个数据流,以便跨越网络传输消息。
从例子程序 UseScatterGather.java 中可以看到分散读取和聚集写入的实际应用。
文件锁定
文件锁定初看起来可能让人迷惑。它 似乎 指的是防止程序或者用户访问特定文件。事实上,文件锁就像常规的 Java 对象锁 ― 它们是 劝告式的(advisory) 锁。它们不阻止任何形式的数据访问,相反,它们通过锁的共享和获取赖允许系统的不同部分相互协调。
您可以锁定整个文件或者文件的一部分。如果您获取一个排它锁,那么其他人就不能获得同一个文件或者文件的一部分上的锁。如果您获得一个共享锁,那么其他人可以获得同一个文件或者文件一部分上的共享锁,但是不能获得排它锁。文件锁定并不总是出于保护数据的目的。例如,您可能临时锁定一个文件以保证特定的写操作成为原子的,而不会有其他程序的干扰。
大多数操作系统提供了文件系统锁,但是它们并不都是采用同样的方式。有些实现提供了共享锁,而另一些仅提供了排它锁。事实上,有些实现使得文件的锁定部分不可访问,尽管大多数实现不是这样的。
在本节中,您将学习如何在 NIO 中执行简单的文件锁过程,我们还将探讨一些保证被锁定的文件尽可能可移植的方法。
要获取文件的一部分上的锁,您要调用一个打开的 FileChannel
上的 lock()
方法。注意,如果要获取一个排它锁,您必须以写方式打开文件。
RandomAccessFile raf = new RandomAccessFile( "usefilelocks.txt" , "rw" );
FileChannel fc = raf.getChannel(); FileLock lock = fc.lock( start, end, false );
|
在拥有锁之后,您可以执行需要的任何敏感操作,然后再释放锁:
lock.release(); |
在释放锁后,尝试获得锁的其他任何程序都有机会获得它。
本小节的例子程序 UseFileLocks.java 必须与它自己并行运行。这个程序获取一个文件上的锁,持有三秒钟,然后释放它。如果同时运行这个程序的多个实例,您会看到每个实例依次获得锁。
文件锁定可能是一个复杂的操作,特别是考虑到不同的操作系统是以不同的方式实现锁这一事实。下面的指导原则将帮助您尽可能保持代码的可移植性:
- 只使用排它锁。
- 将所有的锁视为劝告式的(advisory)。
-
连网和异步 I/O
连网是学习异步 I/O 的很好基础,而异步 I/O 对于在 Java 语言中执行任何输入/输出过程的人来说,无疑都是必须具备的知识。NIO 中的连网与 NIO 中的其他任何操作没有什么不同 ― 它依赖通道和缓冲区,而您通常使用
InputStream
和OutputStream
来获得通道。本节首先介绍异步 I/O 的基础 ― 它是什么以及它不是什么,然后转向更实用的、程序性的例子。
异步 I/O 是一种 没有阻塞地 读写数据的方法。通常,在代码进行
read()
调用时,代码会阻塞直至有可供读取的数据。同样,write()
调用将会阻塞直至数据能够写入。另一方面,异步 I/O 调用不会阻塞。相反,您将注册对特定 I/O 事件的兴趣 ― 可读的数据的到达、新的套接字连接,等等,而在发生这样的事件时,系统将会告诉您。
异步 I/O 的一个优势在于,它允许您同时根据大量的输入和输出执行 I/O。同步程序常常要求助于轮询,或者创建许许多多的线程以处理大量的连接。使用异步 I/O,您可以监听任何数量的通道上的事件,不用轮询,也不用额外的线程。
我们将通过研究一个名为 MultiPortEcho.java 的例子程序来查看异步 I/O 的实际应用。这个程序就像传统的 echo server,它接受网络连接并向它们回响它们可能发送的数据。不过它有一个附加的特性,就是它能同时监听多个端口,并处理来自所有这些端口的连接。并且它只在单个线程中完成所有这些工作。
本节的阐述对应于
MultiPortEcho
的源代码中的go()
方法的实现,因此应该看一下源代码,以便对所发生的事情有个更全面的了解。异步 I/O 中的核心对象名为
Selector
。Selector
就是您注册对各种 I/O 事件的兴趣的地方,而且当那些事件发生时,就是这个对象告诉您所发生的事件。所以,我们需要做的第一件事就是创建一个
Selector
:Selector selector = Selector.open();
然后,我们将对不同的通道对象调用
register()
方法,以便注册我们对这些对象中发生的 I/O 事件的兴趣。register()
的第一个参数总是这个Selector
。为了接收连接,我们需要一个
ServerSocketChannel
。事实上,我们要监听的每一个端口都需要有一个ServerSocketChannel
。对于每一个端口,我们打开一个ServerSocketChannel
,如下所示:ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(
false
);
ServerSocket ss = ssc.socket();
InetSocketAddress address =
new
InetSocketAddress( ports[i] );
ss.bind( address );
第一行创建一个新的
ServerSocketChannel
,最后三行将它绑定到给定的端口。第二行将ServerSocketChannel
设置为 非阻塞的 。我们必须对每一个要使用的套接字通道调用这个方法,否则异步 I/O 就不能工作。下一步是将新打开的
ServerSocketChannels
注册到Selector
上。为此我们使用 ServerSocketChannel.register() 方法,如下所示SelectionKey key = ssc.register( selector, SelectionKey.OP_ACCEPT );
register()
的第一个参数总是这个Selector
。第二个参数是OP_ACCEPT
,这里它指定我们想要监听 accept 事件,也就是在新的连接建立时所发生的事件。这是适用于ServerSocketChannel
的唯一事件类型。请注意对
register()
的调用的返回值。SelectionKey
代表这个通道在此Selector
上的这个注册。当某个Selector
通知您某个传入事件时,它是通过提供对应于该事件的SelectionKey
来进行的。SelectionKey
还可以用于取消通道的注册。现在已经注册了我们对一些 I/O 事件的兴趣,下面将进入主循环。使用
Selectors
的几乎每个程序都像下面这样使用内部循环:int
num = selector.select();
Set selectedKeys = selector.selectedKeys();
Iterator it = selectedKeys.iterator();
while
(it.hasNext()) {
SelectionKey key = (SelectionKey)it.next();
// ... deal with I/O event ...
}
首先,我们调用
Selector
的select()
方法。这个方法会阻塞,直到至少有一个已注册的事件发生。当一个或者更多的事件发生时,select()
方法将返回所发生的事件的数量。接下来,我们调用
Selector
的selectedKeys()
方法,它返回发生了事件的SelectionKey
对象的一个集合
。我们通过迭代
SelectionKeys
并依次处理每个SelectionKey
来处理事件。对于每一个SelectionKey
,您必须确定发生的是什么 I/O 事件,以及这个事件影响哪些 I/O 对象。程序执行到这里,我们仅注册了
ServerSocketChannel
,并且仅注册它们“接收”事件。为确认这一点,我们对SelectionKey
调用readyOps()
方法,并检查发生了什么类型的事件:if
((key.readyOps() & SelectionKey.OP_ACCEPT)
== SelectionKey.OP_ACCEPT) {
// Accept the new connection
// ...
}
可以肯定地说,
readOps()
方法告诉我们该事件是新的连接。接受新的连接
因为我们知道这个服务器套接字上有一个传入连接在等待,所以可以安全地接受它;也就是说,不用担心
accept()
操作会阻塞:ServerSocketChannel ssc = (ServerSocketChannel)key.channel();
SocketChannel sc = ssc.accept();
下一步是将新连接的
SocketChannel
配置为非阻塞的。而且由于接受这个连接的目的是为了读取来自套接字的数据,所以我们还必须将SocketChannel
注册到Selector
上,如下所示:sc.configureBlocking(
false
);
SelectionKey newKey = sc.register( selector, SelectionKey.OP_READ );
注意我们使用
register()
的OP_READ
参数,将SocketChannel
注册用于 读取 而不是 接受 新连接。在处理
SelectionKey
之后,我们几乎可以返回主循环了。但是我们必须首先将处理过的SelectionKey
从选定的键集合中删除。如果我们没有删除处理过的键,那么它仍然会在主集合中以一个激活的键出现,这会导致我们尝试再次处理它。我们调用迭代器的remove()
方法来删除处理过的SelectionKey
:it.remove();
现在我们可以返回主循环并接受从一个套接字中传入的数据(或者一个传入的 I/O 事件)了。
当来自一个套接字的数据到达时,它会触发一个 I/O 事件。这会导致在主循环中调用
Selector.select()
,并返回一个或者多个 I/O 事件。这一次,SelectionKey
将被标记为OP_READ
事件,如下所示:}
else
if
((key.readyOps() & SelectionKey.OP_READ)
== SelectionKey.OP_READ) {
// Read the data
SocketChannel sc = (SocketChannel)key.channel();
// ...
}
与以前一样,我们取得发生 I/O 事件的通道并处理它。在本例中,由于这是一个 echo server,我们只希望从套接字中读取数据并马上将它发送回去。
每次返回主循环,我们都要调用
select
的Selector()
方法,并取得一组SelectionKey
。每个键代表一个 I/O 事件。我们处理事件,从选定的键集中删除SelectionKey
,然后返回主循环的顶部。这个程序有点过于简单,因为它的目的只是展示异步 I/O 所涉及的技术。在现实的应用程序中,您需要通过将通道从
Selector
中删除来处理关闭的通道。而且您可能要使用多个线程。这个程序可以仅使用一个线程,因为它只是一个演示,但是在现实场景中,创建一个线程池来负责 I/O 事件处理中的耗时部分会更有意义。字符集
根据 Sun 的文档,一个
Charset
是“十六位 Unicode 字符序列与字节序列之间的一个命名的映射”。实际上,一个Charset
允许您以尽可能最具可移植性的方式读写字符序列。Java 语言被定义为基于 Unicode。然而在实际上,许多人编写代码时都假设一个字符在磁盘上或者在网络流中用一个字节表示。这种假设在许多情况下成立,但是并不是在所有情况下都成立,而且随着计算机变得对 Unicode 越来越友好,这个假设就日益变得不能成立了。
在本节中,我们将看一下如何使用
Charsets
以适合现代文本格式的方式处理文本数据。这里将使用的示例程序相当简单,不过,它触及了使用Charset
的所有关键方面:为给定的字符编码创建Charset
,以及使用该Charset
解码和编码文本数据。要读和写文本,我们要分别使用
CharsetDecoder
和CharsetEncoder
。将它们称为 编码器 和 解码器 是有道理的。一个 字符 不再表示一个特定的位模式,而是表示字符系统中的一个实体。因此,由某个实际的位模式表示的字符必须以某种特定的 编码 来表示。CharsetDecoder
用于将逐位表示的一串字符转换为具体的char
值。同样,一个CharsetEncoder
用于将字符转换回位。在下一个小节中,我们将考察一个使用这些对象来读写数据的程序。
现在我们将分析这个例子程序 UseCharsets.java。这个程序非常简单 ― 它从一个文件中读取一些文本,并将该文本写入另一个文件。但是它把该数据当作文本数据,并使用
CharBuffer
来将该数句读入一个CharsetDecoder
中。同样,它使用CharsetEncoder
来写回该数据。我们将假设字符以 ISO-8859-1(Latin1) 字符集(这是 ASCII 的标准扩展)的形式储存在磁盘上。尽管我们必须为使用 Unicode 做好准备,但是也必须认识到不同的文件是以不同的格式储存的,而 ASCII 无疑是非常普遍的一种格式。事实上,每种 Java 实现都要求对以下字符编码提供完全的支持:
- US-ASCII
- ISO-8859-1
- UTF-8
- UTF-16BE
- UTF-16LE
- UTF-16
-
在打开相应的文件、将输入数据读入名为
inputData
的ByteBuffer
之后,我们的程序必须创建 ISO-8859-1 (Latin1) 字符集的一个实例:Charset latin1 = Charset.forName(
"ISO-8859-1"
);
然后,创建一个解码器(用于读取)和一个编码器 (用于写入):
CharsetDecoder decoder = latin1.newDecoder();
CharsetEncoder encoder = latin1.newEncoder();
为了将字节数据解码为一组字符,我们把
ByteBuffer
传递给CharsetDecoder
,结果得到一个CharBuffer
CharBuffer cb = decoder.decode( inputData );
如果想要处理字符,我们可以在程序的此处进行。但是我们只想无改变地将它写回,所以没有什么要做的。
要写回数据,我们必须使用
CharsetEncoder
将它转换回字节:ByteBuffer outputData = encoder.encode( cb );
在转换完成之后,我们就可以将数据写到文件中了。
结束语和参考资料
正如您所看到的, NIO 库有大量的特性。在一些新特性(例如文件锁定和字符集)提供新功能的同时,许多特性在优化方面也非常优秀。
在基础层次上,通道和缓冲区可以做的事情几乎都可以用原来的面向流的类来完成。但是通道和缓冲区允许以 快得多 的方式完成这些相同的旧操作 ― 事实上接近系统所允许的最大速度。
不过 NIO 最强大的长度之一在于,它提供了一种在 Java 语言中执行进行输入/输出的新的(也是迫切需要的)结构化方式。随诸如缓冲区、通道和异步 I/O 这些概念性(且可实现的)实体而来的,是我们重新思考 Java 程序中的 I/O过程的机会。这样,NIO 甚至为我们最熟悉的 I/O 过程也带来了新的活力,同时赋予我们通过和以前不同并且更好的方式执行它们的机会。
-
目录:
一.java NIO 和阻塞I/O的区别
1. 阻塞I/O通信模型
2. java NIO原理及通信模型
二.java NIO服务端和客户端代码实现
具体分析:
一.java NIO 和阻塞I/O的区别
1. 阻塞I/O通信模型
假如现在你对阻塞I/O已有了一定了解,我们知道阻塞I/O在调用InputStream.read()方法时是阻塞的,它会一直等到数据到来时(或超时)才会返回;同样,在调用ServerSocket.accept()方法时,也会一直阻塞到有客户端连接才会返回,每个客户端连接过来后,服务端都会启动一个线程去处理该客户端的请求。阻塞I/O的通信模型示意图如下:
如果你细细分析,一定会发现阻塞I/O存在一些缺点。根据阻塞I/O通信模型,我总结了它的两点缺点:
1. 当客户端多时,会创建大量的处理线程。且每个线程都要占用栈空间和一些CPU时间
2. 阻塞可能带来频繁的上下文切换,且大部分上下文切换可能是无意义的。
在这种情况下非阻塞式I/O就有了它的应用前景。
2. java NIO原理及通信模型
Java NIO是在jdk1.4开始使用的,它既可以说成“新I/O”,也可以说成非阻塞式I/O。下面是java NIO的工作原理:
1. 由一个专门的线程来处理所有的 IO 事件,并负责分发。
2. 事件驱动机制:事件到的时候触发,而不是同步的去监视事件。
3. 线程通讯:线程之间通过 wait,notify 等方式通讯。保证每次上下文切换都是有意义的。减少无谓的线程切换。
阅读过一些资料之后,下面贴出我理解的java NIO的工作原理图:
(注:每个线程的处理流程大概都是读取数据、解码、计算处理、编码、发送响应。)
Java NIO的服务端只需启动一个专门的线程来处理所有的 IO 事件,这种通信模型是怎么实现的呢?呵呵,我们一起来探究它的奥秘吧。java NIO采用了双向通道(channel)进行数据传输,而不是单向的流(stream),在通道上可以注册我们感兴趣的事件。一共有以下四种事件:
事件名
对应值
服务端接收客户端连接事件
SelectionKey.OP_ACCEPT(16)
客户端连接服务端事件
SelectionKey.OP_CONNECT(8)
读事件
SelectionKey.OP_READ(1)
写事件
SelectionKey.OP_WRITE(4)
服务端和客户端各自维护一个管理通道的对象,我们称之为selector,该对象能检测一个或多个通道 (channel) 上的事件。我们以服务端为例,如果服务端的selector上注册了读事件,某时刻客户端给服务端发送了一些数据,阻塞I/O这时会调用read()方法阻塞地读取数据,而NIO的服务端会在selector中添加一个读事件。服务端的处理线程会轮询地访问selector,如果访问selector时发现有感兴趣的事件到达,则处理这些事件,如果没有感兴趣的事件到达,则处理线程会一直阻塞直到感兴趣的事件到达为止。下面是我理解的java NIO的通信模型示意图:
二.java NIO服务端和客户端代码实现
为了更好地理解java NIO,下面贴出服务端和客户端的简单代码实现。
服务端:
- package cn.nio;
- import java.io.IOException;
- import java.net.InetSocketAddress;
- import java.nio.ByteBuffer;
- import java.nio.channels.SelectionKey;
- import java.nio.channels.Selector;
- import java.nio.channels.ServerSocketChannel;
- import java.nio.channels.SocketChannel;
- import java.util.Iterator;
- /**
- * NIO服务端
- * @author 小路
- */
- public class NIOServer {
- //通道管理器
- private Selector selector;
- /**
- * 获得一个ServerSocket通道,并对该通道做一些初始化的工作
- * @param port 绑定的端口号
- * @throws IOException
- */
- public void initServer(int port) throws IOException {
- // 获得一个ServerSocket通道
- ServerSocketChannel serverChannel = ServerSocketChannel.open();
- // 设置通道为非阻塞
- serverChannel.configureBlocking(false);
- // 将该通道对应的ServerSocket绑定到port端口
- serverChannel.socket().bind(new InetSocketAddress(port));
- // 获得一个通道管理器
- this.selector = Selector.open();
- //将通道管理器和该通道绑定,并为该通道注册SelectionKey.OP_ACCEPT事件,注册该事件后,
- //当该事件到达时,selector.select()会返回,如果该事件没到达selector.select()会一直阻塞。
- serverChannel.register(selector, SelectionKey.OP_ACCEPT);
- }
- /**
- * 采用轮询的方式监听selector上是否有需要处理的事件,如果有,则进行处理
- * @throws IOException
- */
- @SuppressWarnings("unchecked")
- public void listen() throws IOException {
- System.out.println("服务端启动成功!");
- // 轮询访问selector
- while (true) {
- //当注册的事件到达时,方法返回;否则,该方法会一直阻塞
- selector.select();
- // 获得selector中选中的项的迭代器,选中的项为注册的事件
- Iterator ite = this.selector.selectedKeys().iterator();
- while (ite.hasNext()) {
- SelectionKey key = (SelectionKey) ite.next();
- // 删除已选的key,以防重复处理
- ite.remove();
- // 客户端请求连接事件
- if (key.isAcceptable()) {
- ServerSocketChannel server = (ServerSocketChannel) key
- .channel();
- // 获得和客户端连接的通道
- SocketChannel channel = server.accept();
- // 设置成非阻塞
- channel.configureBlocking(false);
- //在这里可以给客户端发送信息哦
- channel.write(ByteBuffer.wrap(new String("向客户端发送了一条信息").getBytes()));
- //在和客户端连接成功之后,为了可以接收到客户端的信息,需要给通道设置读的权限。
- channel.register(this.selector, SelectionKey.OP_READ);
- // 获得了可读的事件
- } else if (key.isReadable()) {
- read(key);
- }
- }
- }
- }
- /**
- * 处理读取客户端发来的信息 的事件
- * @param key
- * @throws IOException
- */
- public void read(SelectionKey key) throws IOException{
- // 服务器可读取消息:得到事件发生的Socket通道
- SocketChannel channel = (SocketChannel) key.channel();
- // 创建读取的缓冲区
- ByteBuffer buffer = ByteBuffer.allocate(10);
- channel.read(buffer);
- byte[] data = buffer.array();
- String msg = new String(data).trim();
- System.out.println("服务端收到信息:"+msg);
- ByteBuffer outBuffer = ByteBuffer.wrap(msg.getBytes());
- channel.write(outBuffer);// 将消息回送给客户端
- }
- /**
- * 启动服务端测试
- * @throws IOException
- */
- public static void main(String[] args) throws IOException {
- NIOServer server = new NIOServer();
- server.initServer(8000);
- server.listen();
- }
- }
客户端:
- package cn.nio;
- import java.io.IOException;
- import java.net.InetSocketAddress;
- import java.nio.ByteBuffer;
- import java.nio.channels.SelectionKey;
- import java.nio.channels.Selector;
- import java.nio.channels.SocketChannel;
- import java.util.Iterator;
- /**
- * NIO客户端
- * @author 小路
- */
- public class NIOClient {
- //通道管理器
- private Selector selector;
- /**
- * 获得一个Socket通道,并对该通道做一些初始化的工作
- * @param ip 连接的服务器的ip
- * @param port 连接的服务器的端口号
- * @throws IOException
- */
- public void initClient(String ip,int port) throws IOException {
- // 获得一个Socket通道
- SocketChannel channel = SocketChannel.open();
- // 设置通道为非阻塞
- channel.configureBlocking(false);
- // 获得一个通道管理器
- this.selector = Selector.open();
- // 客户端连接服务器,其实方法执行并没有实现连接,需要在listen()方法中调
- //用channel.finishConnect();才能完成连接
- channel.connect(new InetSocketAddress(ip,port));
- //将通道管理器和该通道绑定,并为该通道注册SelectionKey.OP_CONNECT事件。
- channel.register(selector, SelectionKey.OP_CONNECT);
- }
- /**
- * 采用轮询的方式监听selector上是否有需要处理的事件,如果有,则进行处理
- * @throws IOException
- */
- @SuppressWarnings("unchecked")
- public void listen() throws IOException {
- // 轮询访问selector
- while (true) {
- selector.select();
- // 获得selector中选中的项的迭代器
- Iterator ite = this.selector.selectedKeys().iterator();
- while (ite.hasNext()) {
- SelectionKey key = (SelectionKey) ite.next();
- // 删除已选的key,以防重复处理
- ite.remove();
- // 连接事件发生
- if (key.isConnectable()) {
- SocketChannel channel = (SocketChannel) key
- .channel();
- // 如果正在连接,则完成连接
- if(channel.isConnectionPending()){
- channel.finishConnect();
- }
- // 设置成非阻塞
- channel.configureBlocking(false);
- //在这里可以给服务端发送信息哦
- channel.write(ByteBuffer.wrap(new String("向服务端发送了一条信息").getBytes()));
- //在和服务端连接成功之后,为了可以接收到服务端的信息,需要给通道设置读的权限。
- channel.register(this.selector, SelectionKey.OP_READ);
- // 获得了可读的事件
- } else if (key.isReadable()) {
- read(key);
- }
- }
- }
- }
- /**
- * 处理读取服务端发来的信息 的事件
- * @param key
- * @throws IOException
- */
- public void read(SelectionKey key) throws IOException{
- //和服务端的read方法一样
- }
- /**
- * 启动客户端测试
- * @throws IOException
- */
- public static void main(String[] args) throws IOException {
- NIOClient client = new NIOClient();
- client.initClient("localhost",8000);
- client.listen();
- }
- }
相关推荐
8. **NIO(New Input/Output)**: Java 1.4引入了NIO(New IO),它提供了非阻塞的I/O操作,使用选择器(Selector)和通道(Channel)来提高并发性能。通道类如FileChannel、SocketChannel和DatagramChannel,以及...
### Java NIO 处理超大数据文件的知识点详解 ...综上所述,使用Java NIO处理超大数据文件时,关键是利用好内存映射文件技术和合理的数据读取策略,通过适当的分块和数据解析方法,可以有效地提升读取速度和处理能力。
### Java NIO 详细教程知识点解析 #### 一、Java NIO 概述 Java NIO(New IO)是Java平台提供的一种新的IO操作模式,它首次出现在Java 1.4版本中,并在后续版本中不断完善。Java NIO 的设计目的是为了克服传统Java ...
Java_NIO类库Selector机制解析.docJava_NIO类库Selector机制解析.docJava_NIO类库Selector机制解析.docJava_NIO类库Selector机制解析.doc
Java NIO,即Non-Blocking I/O,是Java在JDK 1.4引入的一套新的I/O API,旨在提供一种更加高效的方式来处理I/O操作,尤其是对于网络编程和高并发场景。NIO的核心概念包括通道(Channel)、缓冲区(Buffer)和选择器...
Java NIO,全称为Non-blocking Input/Output,是Java在1.4版本引入的一个新特性,旨在提供一种更高效、更灵活的I/O操作方式。相比于传统的BIO(Blocking I/O),NIO允许我们以非阻塞的方式读写数据,提高了系统资源...
Java NIO(New Input/Output)是Java标准库中提供的一种I/O模型,与传统的 Blocking I/O 不同,NIO 具有非阻塞的特性,可以提高在高并发场景下的处理能力。在这个实例中,"java NIO 消息推送实例" 旨在展示如何使用...
在Java网络编程中,`ServerBootstrap` 是一个用于构建高性能异步网络应用的工具,它主要用在NIO(Non-blocking I/O)模式下。NIO(非阻塞I/O)是Java提供的一种I/O模型,它允许一个线程处理多个输入/输出流,与传统...
Java NIO(Non-blocking Input/Output)是一种在Java中处理I/O操作的新方式,相比于传统的BIO(Blocking I/O),NIO提供了更高效的数据传输能力,尤其适合于高并发、低延迟的网络应用,如聊天服务器。在这个场景下,...
《Netty核心技术及剖析》是针对Java NIO框架和Netty框架的一份深入解析文档,旨在帮助Java开发人员更好地理解和运用这两个技术。Netty是一个高性能、异步事件驱动的网络应用框架,它为协议服务器和客户端提供了丰富...
3. **NIO.2**:引入了非阻塞I/O(New I/O)框架,扩展了Java对异步I/O的支持。 4. **动态代理**:增强了Java代理机制,支持运行时创建接口代理。 5. **脚本引擎支持**:通过JSR 223,JDK 6引入了对JavaScript和其他...
Java NIO(New Input/Output)是Java标准库在JDK 1.4引入的一套全新的I/O API,它的设计目标是提供一种更有效、更灵活的I/O操作方式,尤其是在处理大量并发连接时,其性能表现显著优于传统的阻塞I/O模型。NIO的核心...
### Java NIO通信框架在电信领域的实践 #### 华为电信软件技术架构演进 **1.1 电信软件概述** 电信软件是一个宽泛的概念,根据功能和应用场景的不同大致可以分为两大类:系统软件和业务应用软件。系统软件通常...
根据提供的文件信息,本文将围绕Java NIO(New Input/Output)的相关知识点进行详细解析,同时探讨相关的视频学习资源。 ### Java NIO简介 Java NIO(New Input/Output),即新输入输出,是Java 1.4版本引入的一个...
总的来说,Java NIO异步长连接服务端与客户端的实现涉及到网络编程、多路复用、缓冲区操作以及数据解析等多个知识点。通过合理利用NIO,可以构建高效、可扩展的网络应用程序。对于客户端读取的数据处理,可以根据...
Java NIO(New Input/Output)是Java提供的一种新的I/O操作方式,它在JDK 1.4中引入,极大地提高了文件操作的效率和灵活性。NIO与IO(Input/Output)相比,提供了更加丰富的API和更好的性能。本文将详细介绍Java NIO...
全面理解 Java 网络编程 - BIO、NIO、AIO 本课程旨在帮助学生全面理解 Java 网络编程中的 BIO、NIO、AIO 三剑客,掌握 RPC 编程的基础知识,并结合实战项目巩固所学。 一、网络编程三剑客 - BIO、NIO、AIO BIO...