`
chenzehe
  • 浏览: 538172 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

Java NIO 使用实例

 
阅读更多

        在JDK1.4之前,Java OutputStream的write方法、InputStream的Read方法和ServerSocket的accept()方法都是阻塞方法,JDK1.4之前Java引入了新的输入输出系统(New Input/Out,NIO),非阻塞是Java NIO实现的重要功能之一 。

 1、Buffer

缓冲区,传输数据使用,本质是一个数组,Channel中读数据和写数据都只能通过Buffer传输。

2、Channel

通道,所有的IO流在NIO中都是从Channel开始的,数据可以从Channel读到Buffer中,也可以从Buffer写到Channel中,是Buffer对象的唯一接口。

3、Selector

选择器,它能检测一个或多个通道 (channel) 上的事件,并将事件分发出去。

使用一个 select 线程就能监听多个通道上的事件,并基于事件驱动触发相应的响应。而不需要为每个 channel 去分配一个线程。

4、SelectionKey

包含了事件的状态信息和时间对应的通道的绑定。

 

使用NIO的步骤:

1\创建一个Selector实例

2\将该实例注册到各种通道,指定每个通道上感兴趣的IO操作

3\重复执行,选择器循环:

    31\调用一种Select()方法

    32\获取已选键值

    33\对于已选中键集中的每一个键:

        331\将已选键从键集中移除

        332\获取信道,并从键中获取附件

        333\确定准备就绪的操作并执行;对于accept操作获得的SocketChannel对象,需将信道设为非阻塞模式,并将其注册到选择器中

        334\根据需要,修改键的兴趣操作集

 

如下代码:

 

NIOServer.java

package org.hadoopinternal.nio;

import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

public class NIOServer {
	private static final int TIMEOUT  = 300;
	private static final int PORT     = 12112;

	public static void main(String[] args) {		
		try {		    
			Selector selector = Selector.open();
			
			ServerSocketChannel listenChannel = ServerSocketChannel.open();
			listenChannel.configureBlocking(false);
			listenChannel.socket().bind(new InetSocketAddress(PORT));
			listenChannel.register(selector, SelectionKey.OP_ACCEPT);
			
			while(true) {
				if(selector.select(TIMEOUT)==0) {
					System.out.print(".");
					continue;
				}
				
				Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
				while( iter.hasNext() ) {
					SelectionKey key = iter.next();
					iter.remove();
					
					//Server socket channel has pending connection request?
					if( key.isAcceptable() ) {
						SocketChannel channel=listenChannel.accept();
						channel.configureBlocking(false);
						SelectionKey connkey=channel.register(selector, SelectionKey.OP_READ );
						NIOServerConnection conn=new NIOServerConnection(connkey);
						connkey.attach(conn);
					}
					
					if( key.isReadable() ) {
						NIOServerConnection conn=(NIOServerConnection) key.attachment();
						conn.handleRead();
					}
					
					if( key.isValid() && key.isWritable() ) {
						NIOServerConnection conn=(NIOServerConnection) key.attachment();
						conn.handleWrite();
					}
				}				
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}


NIOServerConnection.java:

package org.hadoopinternal.nio;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;

public class NIOServerConnection {
	private static final int BUFFSIZE = 1024;
	
	SelectionKey key;
	SocketChannel channel;
	ByteBuffer buffer;
	
	public NIOServerConnection(SelectionKey key) {
		this.key=key;
		this.channel=(SocketChannel) key.channel();
		buffer=ByteBuffer.allocate(BUFFSIZE);
	}
	
	public void handleRead() throws IOException {
		long bytesRead=channel.read(buffer);
		
		if(bytesRead==-1) {
			channel.close();
		} else {
			key.interestOps( SelectionKey.OP_READ | SelectionKey.OP_WRITE );
		}
	}

    public void handleWrite() throws IOException {
    	buffer.flip();
    	channel.write(buffer);
    	
    	if(!buffer.hasRemaining()) {
    		key.interestOps( SelectionKey.OP_READ );
    	} 
    	
    	buffer.compact();
	}
}
 

 

如下代码为hadoop IPC Server 中的Listener是一个标准的NIO应用:

 

/** Listens on the socket. Creates jobs for the handler threads*/
  private class Listener extends Thread {
    
    private ServerSocketChannel acceptChannel = null; //the accept channel
    private Selector selector = null; //the selector that we use for the server
    private Reader[] readers = null;
    private int currentReader = 0;
    private InetSocketAddress address; //the address we bind at
    private Random rand = new Random();
    private long lastCleanupRunTime = 0; //the last time when a cleanup connec-
                                         //-tion (for idle connections) ran
    private long cleanupInterval = 10000; //the minimum interval between 
                                          //two cleanup runs
    private int backlogLength = conf.getInt("ipc.server.listen.queue.size", 128);
    private ExecutorService readPool; 
   
    public Listener() throws IOException {
      address = new InetSocketAddress(bindAddress, port);
      // Create a new server socket and set to non blocking mode
      acceptChannel = ServerSocketChannel.open();
      acceptChannel.configureBlocking(false);

      // Bind the server socket to the local host and port
      bind(acceptChannel.socket(), address, backlogLength);
      port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
      // create a selector;
      selector= Selector.open();
      readers = new Reader[readThreads];
      readPool = Executors.newFixedThreadPool(readThreads);
      for (int i = 0; i < readThreads; i++) {
        Selector readSelector = Selector.open();
        Reader reader = new Reader(readSelector);
        readers[i] = reader;
        readPool.execute(reader);
      }

      // Register accepts on the server socket with the selector.
      acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
      this.setName("IPC Server listener on " + port);
      this.setDaemon(true);
    }
    
    private class Reader implements Runnable {
      private volatile boolean adding = false;
      private Selector readSelector = null;

      Reader(Selector readSelector) {
        this.readSelector = readSelector;
      }
      public void run() {
        LOG.info("Starting SocketReader");
        synchronized (this) {
          while (running) {
            SelectionKey key = null;
            try {
              readSelector.select();
              while (adding) {
                this.wait(1000);
              }              

              Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
              while (iter.hasNext()) {
                key = iter.next();
                iter.remove();
                if (key.isValid()) {
                  if (key.isReadable()) {
                    doRead(key);
                  }
                }
                key = null;
              }
            } catch (InterruptedException e) {
              if (running) {                      // unexpected -- log it
                LOG.info(getName() + " caught: " +
                         StringUtils.stringifyException(e));
              }
            } catch (IOException ex) {
              LOG.error("Error in Reader", ex);
            }
          }
        }
      }

      /**
       * This gets reader into the state that waits for the new channel
       * to be registered with readSelector. If it was waiting in select()
       * the thread will be woken up, otherwise whenever select() is called
       * it will return even if there is nothing to read and wait
       * in while(adding) for finishAdd call
       */
      public void startAdd() {
        adding = true;
        readSelector.wakeup();
      }
      
      public synchronized SelectionKey registerChannel(SocketChannel channel)
                                                          throws IOException {
          return channel.register(readSelector, SelectionKey.OP_READ);
      }

      public synchronized void finishAdd() {
        adding = false;
        this.notify();        
      }
    }

    /** cleanup connections from connectionList. Choose a random range
     * to scan and also have a limit on the number of the connections
     * that will be cleanedup per run. The criteria for cleanup is the time
     * for which the connection was idle. If 'force' is true then all 
     * connections will be looked at for the cleanup.
     */
    private void cleanupConnections(boolean force) {
      if (force || numConnections > thresholdIdleConnections) {
        long currentTime = System.currentTimeMillis();
        if (!force && (currentTime - lastCleanupRunTime) < cleanupInterval) {
          return;
        }
        int start = 0;
        int end = numConnections - 1;
        if (!force) {
          start = rand.nextInt() % numConnections;
          end = rand.nextInt() % numConnections;
          int temp;
          if (end < start) {
            temp = start;
            start = end;
            end = temp;
          }
        }
        int i = start;
        int numNuked = 0;
        while (i <= end) {
          Connection c;
          synchronized (connectionList) {
            try {
              c = connectionList.get(i);
            } catch (Exception e) {return;}
          }
          if (c.timedOut(currentTime)) {
            if (LOG.isDebugEnabled())
              LOG.debug(getName() + ": disconnecting client " + c.getHostAddress());
            closeConnection(c);
            numNuked++;
            end--;
            c = null;
            if (!force && numNuked == maxConnectionsToNuke) break;
          }
          else i++;
        }
        lastCleanupRunTime = System.currentTimeMillis();
      }
    }

    @Override
    public void run() {
      LOG.info(getName() + ": starting");
      SERVER.set(Server.this);
      while (running) {
        SelectionKey key = null;
        try {
          selector.select();
          Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
          while (iter.hasNext()) {
            key = iter.next();
            iter.remove();
            try {
              if (key.isValid()) {
                if (key.isAcceptable())
                  doAccept(key);
              }
            } catch (IOException e) {
            }
            key = null;
          }
        } catch (OutOfMemoryError e) {
          // we can run out of memory if we have too many threads
          // log the event and sleep for a minute and give 
          // some thread(s) a chance to finish
          LOG.warn("Out of Memory in server select", e);
          closeCurrentConnection(key, e);
          cleanupConnections(true);
          try { Thread.sleep(60000); } catch (Exception ie) {}
        } catch (Exception e) {
          closeCurrentConnection(key, e);
        }
        cleanupConnections(false);
      }
      LOG.info("Stopping " + this.getName());

      synchronized (this) {
        try {
          acceptChannel.close();
          selector.close();
        } catch (IOException e) { }

        selector= null;
        acceptChannel= null;
        
        // clean up all connections
        while (!connectionList.isEmpty()) {
          closeConnection(connectionList.remove(0));
        }
      }
    }

    private void closeCurrentConnection(SelectionKey key, Throwable e) {
      if (key != null) {
        Connection c = (Connection)key.attachment();
        if (c != null) {
          if (LOG.isDebugEnabled())
            LOG.debug(getName() + ": disconnecting client " + c.getHostAddress());
          closeConnection(c);
          c = null;
        }
      }
    }

    InetSocketAddress getAddress() {
      return (InetSocketAddress)acceptChannel.socket().getLocalSocketAddress();
    }
    
    void doAccept(SelectionKey key) throws IOException,  OutOfMemoryError {
      Connection c = null;
      ServerSocketChannel server = (ServerSocketChannel) key.channel();
      SocketChannel channel;
      while ((channel = server.accept()) != null) {
        channel.configureBlocking(false);
        channel.socket().setTcpNoDelay(tcpNoDelay);
        Reader reader = getReader();
        try {
          reader.startAdd();
          SelectionKey readKey = reader.registerChannel(channel);
          c = new Connection(readKey, channel, System.currentTimeMillis());
          readKey.attach(c);
          synchronized (connectionList) {
            connectionList.add(numConnections, c);
            numConnections++;
          }
          if (LOG.isDebugEnabled())
            LOG.debug("Server connection from " + c.toString() +
                "; # active connections: " + numConnections +
                "; # queued calls: " + callQueue.size());          
        } finally {
          reader.finishAdd(); 
        }

      }
    }

    void doRead(SelectionKey key) throws InterruptedException {
      int count = 0;
      Connection c = (Connection)key.attachment();
      if (c == null) {
        return;  
      }
      c.setLastContact(System.currentTimeMillis());
      
      try {
        count = c.readAndProcess();
      } catch (InterruptedException ieo) {
        LOG.info(getName() + ": readAndProcess caught InterruptedException", ieo);
        throw ieo;
      } catch (Exception e) {
        LOG.info(getName() + ": readAndProcess threw exception " + e + ". Count of bytes read: " + count, e);
        count = -1; //so that the (count < 0) block is executed
      }
      if (count < 0) {
        if (LOG.isDebugEnabled())
          LOG.debug(getName() + ": disconnecting client " + 
                    c + ". Number of active connections: "+
                    numConnections);
        closeConnection(c);
        c = null;
      }
      else {
        c.setLastContact(System.currentTimeMillis());
      }
    }   

    synchronized void doStop() {
      if (selector != null) {
        selector.wakeup();
        Thread.yield();
      }
      if (acceptChannel != null) {
        try {
          acceptChannel.socket().close();
        } catch (IOException e) {
          LOG.info(getName() + ":Exception in closing listener socket. " + e);
        }
      }
      readPool.shutdown();
    }

    // The method that will return the next reader to work with
    // Simplistic implementation of round robin for now
    Reader getReader() {
      currentReader = (currentReader + 1) % readers.length;
      return readers[currentReader];
    }

  }

 

 

 

 

 

 

 

分享到:
评论

相关推荐

    一个java NIO的例子

    本例子中的"NioServer"可能是一个简单的Java NIO服务器端程序,用于演示如何使用NIO进行网络通信。下面我们将深入探讨Java NIO的关键组件和工作原理。 1. **通道(Channel)**:通道是数据传输的途径,类似于传统的...

    java NIO原理和使用

    #### 三、Java NIO 使用实例 下面是一个简单的 Java NIO 示例,展示了如何使用 `FileChannel` 和 `ByteBuffer` 进行文件复制: ```java package nio; import java.io.FileInputStream; import java.io....

    Java NIO 中文 Java NIO 中文 Java NIO 中文文档

    Java NIO 深入探讨了 1.4 版的 I/O 新特性,并告诉您如何使用这些特性来极大地提升您所写的 Java 代码的执行效率。这本小册子就程序员所面临的有代表性的 I/O 问题作了详尽阐述,并讲解了 如何才能充分利用新的 I/O ...

    java NIO实例

    Java NIO,全称为Non-Blocking Input/Output(非阻塞输入/输出),是Java从1.4版本开始引入的一种新的I/O模型,它为Java应用程序提供了更高效的数据传输方式。传统的Java I/O模型(BIO)在处理大量并发连接时效率较...

    javaNIO实例

    在这个javaNIO实例中,我们可以学习到如何利用Java NIO进行文件的读取、写入以及复制操作。 首先,NIO的核心组件包括通道(Channels)、缓冲区(Buffers)和选择器(Selectors)。通道是数据传输的路径,如文件通道...

    JAVA nio的一个简单的例子

    在这个“JAVA nio的一个简单的例子”中,我们将探讨如何使用Java NIO进行简单的服务器-客户端通信,并计算字符串的哈希值。 在传统的BIO模型中,每个连接都需要一个线程来处理,当并发连接数量增加时,系统会创建...

    java NIO推送实例

    在这个实例中,"java NIO 消息推送实例" 旨在展示如何使用NIO进行服务器向客户端的消息推送。 1. **Java NIO基础** - **通道(Channels)**:Java NIO 提供了多种通道,如文件通道、套接字通道等,它们代表不同...

    java nio socket 例子

    本例包含服务器端和客户端,多线程,每线程多次发送,Eclipse工程,启动服务器使用 nu.javafaq.server.NioServer,启动客户端使用 nu.javafaq.client.NioClient。另本例取自javafaq.nv上的程序修改而成

    nio.rar_NIO_NIO-socket_java nio_java 实例_java.nio

    描述中的“java nio 编程一个实例子.服务端程序”提示我们,这个实例是一个服务器端的应用,它利用Java NIO来处理客户端的连接请求。在NIO服务端编程中,通常会使用`Selector`来监听多个通道的事件,当有新的连接、...

    JavaNIO服务器实例Java开发Java经验技巧共6页

    本资料"JavaNIO服务器实例Java开发Java经验技巧共6页"可能是某个Java开发者或讲师分享的一份关于如何在Java中构建NIO服务器的教程,涵盖了6个关键页面的内容。尽管具体的细节无法在此直接提供,但我们可以根据Java ...

    java-nio.rar_java nio_nio 对象实例化

    总结来说,这个压缩包的内容可能涵盖了如何在Java NIO中实例化和使用通道、缓冲区、选择器,以及如何实现一个简单的多线程服务器,利用选择器来监听和处理来自多个客户端的非阻塞I/O请求。这些内容对于理解和使用...

    Java Nio selector例程

    我研究并实现的Java Nio selector例子。java侧起server(NioUdpServer1.java),基于Java Nio的selector 阻塞等候,一个android app(NioUdpClient1文件夹)和一个java程序(UI.java)作为两个client分别向该server...

    java nio 读文件

    二、使用Java NIO读取文件 在Java NIO中,读取文件主要涉及FileChannel和ByteBuffer。以下是一个简单的示例: ```java import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels....

    基于java NIO的socket通信demo

    Java NIO(New Input/Output)是Java标准库提供的一种I/O模型,它与传统的 Blocking I/O(BIO)模型不同,NIO提供了非阻塞的读写方式,...对于理解和实践Java NIO在网络编程中的应用,这是一个非常有价值的参考实例。

    java nio im(server+client)

    总之,这个Java NIO IM实例是一个很好的学习资源,它演示了如何利用NIO进行高效的网络通信。通过深入理解并实践这个示例,开发者可以更好地掌握Java NIO的核心概念和技术,并将其应用于实际项目中,提升系统的性能和...

    Java NIO原理分析及代码实例

    Java NIO(New IO)是Java 1.4版本引入的一个新API,全称为Non-blocking Input/Output,它提供了一种不同于传统IO的编程模型,传统IO...在实际编码时,参考博文链接中的代码实例,可以帮助你更好地理解和实践Java NIO。

    nio.rar_FastCopyFile.java_NIO_UseFloatBuffer.java_java nio_文件锁

    Java NIO(New Input/Output)是Java标准库中提供的一种I/O模型,与传统的BIO( Blocking I/O)相比,NIO...对于初学者来说,这些源码实例可以帮助理解Java NIO的基本用法和优势,进一步提升在实际项目中的应用能力。

    Java NIO详解及源码下载

    下面是一些使用Java NIO的关键API: - `Selector.open()`:创建一个选择器实例。 - `Channel`接口及其子类,如`FileChannel`、`SocketChannel`等。 - `Buffer`接口及其子类,如`ByteBuffer`、`CharBuffer`等。 - `...

    java nio 入门

    在本文中,我们将深入探讨Java NIO的基本概念、组件以及如何在实际编程中使用。 一、Java NIO概述 Java NIO并非简单的输入输出API的替代,而是提供了一种新的I/O模型。传统IO基于流(Stream)进行数据读写,而NIO基于...

Global site tag (gtag.js) - Google Analytics