在JDK1.4之前,Java OutputStream的write方法、InputStream的Read方法和ServerSocket的accept()方法都是阻塞方法,JDK1.4之前Java引入了新的输入输出系统(New Input/Out,NIO),非阻塞是Java NIO实现的重要功能之一 。
1、Buffer
缓冲区,传输数据使用,本质是一个数组,Channel中读数据和写数据都只能通过Buffer传输。
2、Channel
通道,所有的IO流在NIO中都是从Channel开始的,数据可以从Channel读到Buffer中,也可以从Buffer写到Channel中,是Buffer对象的唯一接口。
3、Selector
选择器,它能检测一个或多个通道 (channel) 上的事件,并将事件分发出去。
使用一个 select 线程就能监听多个通道上的事件,并基于事件驱动触发相应的响应。而不需要为每个 channel 去分配一个线程。
4、SelectionKey
包含了事件的状态信息和时间对应的通道的绑定。
使用NIO的步骤:
1\创建一个Selector实例
2\将该实例注册到各种通道,指定每个通道上感兴趣的IO操作
3\重复执行,选择器循环:
31\调用一种Select()方法
32\获取已选键值
33\对于已选中键集中的每一个键:
331\将已选键从键集中移除
332\获取信道,并从键中获取附件
333\确定准备就绪的操作并执行;对于accept操作获得的SocketChannel对象,需将信道设为非阻塞模式,并将其注册到选择器中
334\根据需要,修改键的兴趣操作集
如下代码:
NIOServer.java package org.hadoopinternal.nio; import java.net.InetSocketAddress; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; public class NIOServer { private static final int TIMEOUT = 300; private static final int PORT = 12112; public static void main(String[] args) { try { Selector selector = Selector.open(); ServerSocketChannel listenChannel = ServerSocketChannel.open(); listenChannel.configureBlocking(false); listenChannel.socket().bind(new InetSocketAddress(PORT)); listenChannel.register(selector, SelectionKey.OP_ACCEPT); while(true) { if(selector.select(TIMEOUT)==0) { System.out.print("."); continue; } Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); while( iter.hasNext() ) { SelectionKey key = iter.next(); iter.remove(); //Server socket channel has pending connection request? if( key.isAcceptable() ) { SocketChannel channel=listenChannel.accept(); channel.configureBlocking(false); SelectionKey connkey=channel.register(selector, SelectionKey.OP_READ ); NIOServerConnection conn=new NIOServerConnection(connkey); connkey.attach(conn); } if( key.isReadable() ) { NIOServerConnection conn=(NIOServerConnection) key.attachment(); conn.handleRead(); } if( key.isValid() && key.isWritable() ) { NIOServerConnection conn=(NIOServerConnection) key.attachment(); conn.handleWrite(); } } } } catch (Exception e) { e.printStackTrace(); } } } NIOServerConnection.java: package org.hadoopinternal.nio; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; public class NIOServerConnection { private static final int BUFFSIZE = 1024; SelectionKey key; SocketChannel channel; ByteBuffer buffer; public NIOServerConnection(SelectionKey key) { this.key=key; this.channel=(SocketChannel) key.channel(); buffer=ByteBuffer.allocate(BUFFSIZE); } public void handleRead() throws IOException { long bytesRead=channel.read(buffer); if(bytesRead==-1) { channel.close(); } else { key.interestOps( SelectionKey.OP_READ | SelectionKey.OP_WRITE ); } } public void handleWrite() throws IOException { buffer.flip(); channel.write(buffer); if(!buffer.hasRemaining()) { key.interestOps( SelectionKey.OP_READ ); } buffer.compact(); } }
如下代码为hadoop IPC Server 中的Listener是一个标准的NIO应用:
/** Listens on the socket. Creates jobs for the handler threads*/ private class Listener extends Thread { private ServerSocketChannel acceptChannel = null; //the accept channel private Selector selector = null; //the selector that we use for the server private Reader[] readers = null; private int currentReader = 0; private InetSocketAddress address; //the address we bind at private Random rand = new Random(); private long lastCleanupRunTime = 0; //the last time when a cleanup connec- //-tion (for idle connections) ran private long cleanupInterval = 10000; //the minimum interval between //two cleanup runs private int backlogLength = conf.getInt("ipc.server.listen.queue.size", 128); private ExecutorService readPool; public Listener() throws IOException { address = new InetSocketAddress(bindAddress, port); // Create a new server socket and set to non blocking mode acceptChannel = ServerSocketChannel.open(); acceptChannel.configureBlocking(false); // Bind the server socket to the local host and port bind(acceptChannel.socket(), address, backlogLength); port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port // create a selector; selector= Selector.open(); readers = new Reader[readThreads]; readPool = Executors.newFixedThreadPool(readThreads); for (int i = 0; i < readThreads; i++) { Selector readSelector = Selector.open(); Reader reader = new Reader(readSelector); readers[i] = reader; readPool.execute(reader); } // Register accepts on the server socket with the selector. acceptChannel.register(selector, SelectionKey.OP_ACCEPT); this.setName("IPC Server listener on " + port); this.setDaemon(true); } private class Reader implements Runnable { private volatile boolean adding = false; private Selector readSelector = null; Reader(Selector readSelector) { this.readSelector = readSelector; } public void run() { LOG.info("Starting SocketReader"); synchronized (this) { while (running) { SelectionKey key = null; try { readSelector.select(); while (adding) { this.wait(1000); } Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator(); while (iter.hasNext()) { key = iter.next(); iter.remove(); if (key.isValid()) { if (key.isReadable()) { doRead(key); } } key = null; } } catch (InterruptedException e) { if (running) { // unexpected -- log it LOG.info(getName() + " caught: " + StringUtils.stringifyException(e)); } } catch (IOException ex) { LOG.error("Error in Reader", ex); } } } } /** * This gets reader into the state that waits for the new channel * to be registered with readSelector. If it was waiting in select() * the thread will be woken up, otherwise whenever select() is called * it will return even if there is nothing to read and wait * in while(adding) for finishAdd call */ public void startAdd() { adding = true; readSelector.wakeup(); } public synchronized SelectionKey registerChannel(SocketChannel channel) throws IOException { return channel.register(readSelector, SelectionKey.OP_READ); } public synchronized void finishAdd() { adding = false; this.notify(); } } /** cleanup connections from connectionList. Choose a random range * to scan and also have a limit on the number of the connections * that will be cleanedup per run. The criteria for cleanup is the time * for which the connection was idle. If 'force' is true then all * connections will be looked at for the cleanup. */ private void cleanupConnections(boolean force) { if (force || numConnections > thresholdIdleConnections) { long currentTime = System.currentTimeMillis(); if (!force && (currentTime - lastCleanupRunTime) < cleanupInterval) { return; } int start = 0; int end = numConnections - 1; if (!force) { start = rand.nextInt() % numConnections; end = rand.nextInt() % numConnections; int temp; if (end < start) { temp = start; start = end; end = temp; } } int i = start; int numNuked = 0; while (i <= end) { Connection c; synchronized (connectionList) { try { c = connectionList.get(i); } catch (Exception e) {return;} } if (c.timedOut(currentTime)) { if (LOG.isDebugEnabled()) LOG.debug(getName() + ": disconnecting client " + c.getHostAddress()); closeConnection(c); numNuked++; end--; c = null; if (!force && numNuked == maxConnectionsToNuke) break; } else i++; } lastCleanupRunTime = System.currentTimeMillis(); } } @Override public void run() { LOG.info(getName() + ": starting"); SERVER.set(Server.this); while (running) { SelectionKey key = null; try { selector.select(); Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); while (iter.hasNext()) { key = iter.next(); iter.remove(); try { if (key.isValid()) { if (key.isAcceptable()) doAccept(key); } } catch (IOException e) { } key = null; } } catch (OutOfMemoryError e) { // we can run out of memory if we have too many threads // log the event and sleep for a minute and give // some thread(s) a chance to finish LOG.warn("Out of Memory in server select", e); closeCurrentConnection(key, e); cleanupConnections(true); try { Thread.sleep(60000); } catch (Exception ie) {} } catch (Exception e) { closeCurrentConnection(key, e); } cleanupConnections(false); } LOG.info("Stopping " + this.getName()); synchronized (this) { try { acceptChannel.close(); selector.close(); } catch (IOException e) { } selector= null; acceptChannel= null; // clean up all connections while (!connectionList.isEmpty()) { closeConnection(connectionList.remove(0)); } } } private void closeCurrentConnection(SelectionKey key, Throwable e) { if (key != null) { Connection c = (Connection)key.attachment(); if (c != null) { if (LOG.isDebugEnabled()) LOG.debug(getName() + ": disconnecting client " + c.getHostAddress()); closeConnection(c); c = null; } } } InetSocketAddress getAddress() { return (InetSocketAddress)acceptChannel.socket().getLocalSocketAddress(); } void doAccept(SelectionKey key) throws IOException, OutOfMemoryError { Connection c = null; ServerSocketChannel server = (ServerSocketChannel) key.channel(); SocketChannel channel; while ((channel = server.accept()) != null) { channel.configureBlocking(false); channel.socket().setTcpNoDelay(tcpNoDelay); Reader reader = getReader(); try { reader.startAdd(); SelectionKey readKey = reader.registerChannel(channel); c = new Connection(readKey, channel, System.currentTimeMillis()); readKey.attach(c); synchronized (connectionList) { connectionList.add(numConnections, c); numConnections++; } if (LOG.isDebugEnabled()) LOG.debug("Server connection from " + c.toString() + "; # active connections: " + numConnections + "; # queued calls: " + callQueue.size()); } finally { reader.finishAdd(); } } } void doRead(SelectionKey key) throws InterruptedException { int count = 0; Connection c = (Connection)key.attachment(); if (c == null) { return; } c.setLastContact(System.currentTimeMillis()); try { count = c.readAndProcess(); } catch (InterruptedException ieo) { LOG.info(getName() + ": readAndProcess caught InterruptedException", ieo); throw ieo; } catch (Exception e) { LOG.info(getName() + ": readAndProcess threw exception " + e + ". Count of bytes read: " + count, e); count = -1; //so that the (count < 0) block is executed } if (count < 0) { if (LOG.isDebugEnabled()) LOG.debug(getName() + ": disconnecting client " + c + ". Number of active connections: "+ numConnections); closeConnection(c); c = null; } else { c.setLastContact(System.currentTimeMillis()); } } synchronized void doStop() { if (selector != null) { selector.wakeup(); Thread.yield(); } if (acceptChannel != null) { try { acceptChannel.socket().close(); } catch (IOException e) { LOG.info(getName() + ":Exception in closing listener socket. " + e); } } readPool.shutdown(); } // The method that will return the next reader to work with // Simplistic implementation of round robin for now Reader getReader() { currentReader = (currentReader + 1) % readers.length; return readers[currentReader]; } }
相关推荐
本例子中的"NioServer"可能是一个简单的Java NIO服务器端程序,用于演示如何使用NIO进行网络通信。下面我们将深入探讨Java NIO的关键组件和工作原理。 1. **通道(Channel)**:通道是数据传输的途径,类似于传统的...
#### 三、Java NIO 使用实例 下面是一个简单的 Java NIO 示例,展示了如何使用 `FileChannel` 和 `ByteBuffer` 进行文件复制: ```java package nio; import java.io.FileInputStream; import java.io....
Java NIO 深入探讨了 1.4 版的 I/O 新特性,并告诉您如何使用这些特性来极大地提升您所写的 Java 代码的执行效率。这本小册子就程序员所面临的有代表性的 I/O 问题作了详尽阐述,并讲解了 如何才能充分利用新的 I/O ...
Java NIO,全称为Non-Blocking Input/Output(非阻塞输入/输出),是Java从1.4版本开始引入的一种新的I/O模型,它为Java应用程序提供了更高效的数据传输方式。传统的Java I/O模型(BIO)在处理大量并发连接时效率较...
在这个javaNIO实例中,我们可以学习到如何利用Java NIO进行文件的读取、写入以及复制操作。 首先,NIO的核心组件包括通道(Channels)、缓冲区(Buffers)和选择器(Selectors)。通道是数据传输的路径,如文件通道...
在这个“JAVA nio的一个简单的例子”中,我们将探讨如何使用Java NIO进行简单的服务器-客户端通信,并计算字符串的哈希值。 在传统的BIO模型中,每个连接都需要一个线程来处理,当并发连接数量增加时,系统会创建...
在这个实例中,"java NIO 消息推送实例" 旨在展示如何使用NIO进行服务器向客户端的消息推送。 1. **Java NIO基础** - **通道(Channels)**:Java NIO 提供了多种通道,如文件通道、套接字通道等,它们代表不同...
本例包含服务器端和客户端,多线程,每线程多次发送,Eclipse工程,启动服务器使用 nu.javafaq.server.NioServer,启动客户端使用 nu.javafaq.client.NioClient。另本例取自javafaq.nv上的程序修改而成
描述中的“java nio 编程一个实例子.服务端程序”提示我们,这个实例是一个服务器端的应用,它利用Java NIO来处理客户端的连接请求。在NIO服务端编程中,通常会使用`Selector`来监听多个通道的事件,当有新的连接、...
本资料"JavaNIO服务器实例Java开发Java经验技巧共6页"可能是某个Java开发者或讲师分享的一份关于如何在Java中构建NIO服务器的教程,涵盖了6个关键页面的内容。尽管具体的细节无法在此直接提供,但我们可以根据Java ...
总结来说,这个压缩包的内容可能涵盖了如何在Java NIO中实例化和使用通道、缓冲区、选择器,以及如何实现一个简单的多线程服务器,利用选择器来监听和处理来自多个客户端的非阻塞I/O请求。这些内容对于理解和使用...
我研究并实现的Java Nio selector例子。java侧起server(NioUdpServer1.java),基于Java Nio的selector 阻塞等候,一个android app(NioUdpClient1文件夹)和一个java程序(UI.java)作为两个client分别向该server...
二、使用Java NIO读取文件 在Java NIO中,读取文件主要涉及FileChannel和ByteBuffer。以下是一个简单的示例: ```java import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels....
Java NIO(New Input/Output)是Java标准库提供的一种I/O模型,它与传统的 Blocking I/O(BIO)模型不同,NIO提供了非阻塞的读写方式,...对于理解和实践Java NIO在网络编程中的应用,这是一个非常有价值的参考实例。
总之,这个Java NIO IM实例是一个很好的学习资源,它演示了如何利用NIO进行高效的网络通信。通过深入理解并实践这个示例,开发者可以更好地掌握Java NIO的核心概念和技术,并将其应用于实际项目中,提升系统的性能和...
Java NIO(New IO)是Java 1.4版本引入的一个新API,全称为Non-blocking Input/Output,它提供了一种不同于传统IO的编程模型,传统IO...在实际编码时,参考博文链接中的代码实例,可以帮助你更好地理解和实践Java NIO。
Java NIO(New Input/Output)是Java标准库中提供的一种I/O模型,与传统的BIO( Blocking I/O)相比,NIO...对于初学者来说,这些源码实例可以帮助理解Java NIO的基本用法和优势,进一步提升在实际项目中的应用能力。
下面是一些使用Java NIO的关键API: - `Selector.open()`:创建一个选择器实例。 - `Channel`接口及其子类,如`FileChannel`、`SocketChannel`等。 - `Buffer`接口及其子类,如`ByteBuffer`、`CharBuffer`等。 - `...