`
bd2007
  • 浏览: 394658 次
  • 性别: Icon_minigender_2
  • 来自: 上海
社区版块
存档分类
最新评论

NIO 之 选择就绪模式

阅读更多

    [size=small;]? ? ?[/size][size=small;][size=medium;]Java NIO非堵塞应用通常适用用在I/O读写等方面,我们知道,系统运行的性能瓶颈通常在I/O读写,包括对端口和文件的操作上,过去,在打开一个I/O通道后,read()将一直等待在端口一边读取字节内容,如果没有内容进来,read()也是傻傻的等,这会影响我们程序继续做其他事情,那么改进做法就是开设线程,让线程去等待,但是这样做也是相当耗费资源的。[/size][/size]

[size=small;][size=medium;]? ? ? ?Java NIO非堵塞技术实际是采取Reactor模式,或者说是Observer模式为我们监察I/O端口,如果有内容进来,会自动通知我们,这样,我们就不必开启多个线程死等,从外界看,实现了流畅的I/O读写,不堵塞了。[/size][/size]

[size=small;][size=medium;]? ? ? Java NIO出现不只是一个技术性能的提高,你会发现网络上到处在介绍它,因为它具有里程碑意义,从JDK1.4开始,Java开始提高性能相关的功能,从而使得Java在底层或者并行分布式计算等操作上已经可以和C或Perl等语言并驾齐驱。[/size][/size]

?


<p style="text-align: center;">? ? ? ? ? ? ? ? ? ? ?[size=small;]图 1 ? [size=10.5pt;" lang="EN-US]<span><span style="font: 7.0pt ;">? ?[/size][/size]</span><span>类结构图</span></span>
?

?

package cn.chenkangxian.nioconcurrent;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.LinkedList;
import java.util.List;

/**
 * @Project: testNio
 * 
 * @Author: chenkangxian
 * 
 * @Annotation: 使用线程池来处理大量channel并发
 * 
 * @Date:2011-7-5
 * 
 * @Copyright: 2011 chenkangxian, All rights reserved.
 * 
 */
public class SelectSocketsThreadPool extends SelectSockets {

	private static final int MAX_THREADS = 5;
	private ThreadPool pool = new ThreadPool(MAX_THREADS);

	/**
	 * 从socket中读数据
	 */
	protected void readDataFromSocket(SelectionKey key) throws Exception {

		WorkerThread worker = pool.getWorker();
		if (worker == null) {
			return;
		}

		worker.serviceChannel(key);
	}

	/**
	 *  
	 * @Project: concurrentnio
	 *
	 * @Author: chenkangxian
	 *
	 * @Annotation:线程池
	 *
	 * @Date:2011-7-20
	 *
	 * @Copyright: 2011 chenkangxian, All rights reserved.
	 *
	 */
	private class ThreadPool {

		List idle = new LinkedList();

		/**
		 * 线程池初始化
		 * 
		 * @param poolSize 线程池大小
		 */
		ThreadPool(int poolSize) {
			for (int i = 0; i < poolSize; i++) {
				WorkerThread thread = new WorkerThread(this);

				thread.setName("Worker" + (i + 1));
				thread.start();
				idle.add(thread);
			}
		}

		/**
		 * 获得工作线程
		 * 
		 * Author: chenkangxian
		 *
		 * Last Modification Time: 2011-7-20
		 *
		 * @return
		 */
		WorkerThread getWorker() {
			WorkerThread worker = null;

			synchronized (idle) {
				if (idle.size() > 0) {
					worker = (WorkerThread) idle.remove(0);
				}
			}

			return (worker);
		}

		/**
		 * 送回工作线程
		 * 
		 * Author: chenkangxian
		 *
		 * Last Modification Time: 2011-7-20
		 *
		 * @param worker
		 */
		void returnWorker(WorkerThread worker) {
			synchronized (idle) {
				idle.add(worker);
			}
		}
	}

	private class WorkerThread extends Thread {

		private ByteBuffer buffer = ByteBuffer.allocate(1024);
		private ThreadPool pool;
		private SelectionKey key;

		WorkerThread(ThreadPool pool) {
			this.pool = pool;
		}

		public synchronized void run() {
			System.out.println(this.getName() + " is ready");
			while (true) {
				try {
					this.wait();//等待被notify
				} catch (InterruptedException e) {
					e.printStackTrace();
					this.interrupt();
				}

				if (key == null) {//直到有key
					continue;
				}

				System.out.println(this.getName() + " has been awakened");

				try {
					drainChannel(key);
				} catch (Exception e) {
					System.out.println("Caught '" + e + "' closing channel");

					try {
						key.channel().close();
					} catch (IOException ex) {
						ex.printStackTrace();
					}

					key.selector().wakeup();
				}

				key = null;

				this.pool.returnWorker(this);
			}
		}

		synchronized void serviceChannel(SelectionKey key) {
			this.key = key;
			
			//消除读的关注
			key.interestOps(key.interestOps() &amp; (~SelectionKey.OP_READ));
			this.notify();
		}

		void drainChannel(SelectionKey key) throws Exception {

			SocketChannel channel = (SocketChannel) key.channel();
			int count;

			buffer.clear(); 

			while ((count = channel.read(buffer)) > 0) {
				buffer.flip();

				while (buffer.hasRemaining()) {
					channel.write(buffer);
				}

				buffer.clear();
			}

			if (count < 0) {
				channel.close();
				return;
			}
			
			//重新开始关注读事件
			key.interestOps(key.interestOps() | SelectionKey.OP_READ);

			key.selector().wakeup();
		}

	}

	public static void main(String[] args) throws Exception {

		new SelectSocketsThreadPool().go(args);

	}
}

?
?

package cn.chenkangxian.nioconcurrent;

import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

/**
 * 
 * @Project: concurrentnio
 *
 * @Author: chenkangxian
 *
 * @Annotation: 
 *
 * @Date:2011-7-11
 *
 * @Copyright: 2011 chenkangxian, All rights reserved.
 *
 */
public class SelectSockets {

	public static int PORT_NUMBER = 1234;
	
	private ByteBuffer buffer = ByteBuffer.allocate(1024);
	
	public static void main(String[] args) throws Exception {

		new SelectSockets().go(args);

	}
	
	public void go(String[] args) throws Exception{
		
		int port = PORT_NUMBER;
//		if(args.length > 0){
//			port = Integer.parseInt(args[0]);
//		}
//		System.out.println("Listening on port " + port);
		
		ServerSocketChannel serverChannel = ServerSocketChannel.open();
		
		ServerSocket serverSocket = serverChannel.socket();
		
		Selector selector = Selector.open();
		
		serverSocket.bind(new InetSocketAddress(port));
		
		serverChannel.configureBlocking(false);
		
		serverChannel.register(selector, SelectionKey.OP_ACCEPT);
		
		while(true){

			int n = selector.select(); //没有轮询,单个selector
			
			if(n == 0){
				continue; 
			}
			
			Iterator it = selector.selectedKeys().iterator();
			
			while(it.hasNext()){
				SelectionKey key = (SelectionKey)it.next();
				
				if(key.isAcceptable()){
					ServerSocketChannel server = 
						(ServerSocketChannel)key.channel();
					SocketChannel channel = server.accept();
					
					registerChannel(selector,channel
							,SelectionKey.OP_READ);
					
					sayHello(channel);
				}
				
				if(key.isReadable()){
					readDataFromSocket(key);
				}
				
				it.remove();
			}
		}
		
	}
	
	
	/**
	 * 在selector上注册channel,并设置interest
	 * 
	 * Author: chenkangxian
	 *
	 * Last Modification Time: 2011-7-11
	 *
	 * @param selector 选择器
	 * 
	 * @param channel 通道
	 * 
	 * @param ops interest
	 * 
	 * @throws Exception
	 */
	protected void registerChannel(Selector selector,
			SelectableChannel channel, int ops) throws Exception{
		
		if(channel == null){
			return ; 
		}
		
		channel.configureBlocking(false);
		
		channel.register(selector, ops);
	}
	
	
	
	
	/**
	 * 处理有可用数据的通道
	 * 
	 * Author: chenkangxian
	 *
	 * Last Modification Time: 2011-7-11
	 *
	 * @param key 可用通道对应的key
	 * 
	 * @throws Exception
	 */
	protected void readDataFromSocket(SelectionKey key) throws Exception{
		
		SocketChannel socketChannel = (SocketChannel)key.channel();
		int count;
		
		buffer.clear(); //Empty buffer
		
		while((count = socketChannel.read(buffer)) > 0){
			
			buffer.flip(); 
			
			while(buffer.hasRemaining()){
				socketChannel.write(buffer);
			}
			
			buffer.clear(); 
			
		}
		
		if(count < 0){
			socketChannel.close();
		}
		
	}
	
	/**
	 * 打招呼
	 * 
	 * Author: chenkangxian
	 *
	 * Last Modification Time: 2011-7-11
	 *
	 * @param channel 客户端channel
	 * 
	 * @throws Exception
	 */
	private void sayHello(SocketChannel channel) throws Exception{
		
		buffer.clear();
		buffer.put("Hello 哈罗! \r\n".getBytes());
		buffer.flip();
		
		channel.write(buffer);
	}

}

?
?

?

?
?

 
   
     
        <ul style="display:none;">
         
  • <img src='http://dl.iteye.com/upload/attachment/598672/428eea1f-27fb-3c51-91f0-3a112a20490b-thumb.jpg' class='magplus' title='点击查看原始大小图片' />

  •          
             
  • 大小: 28.7 KB

  •         </ul>
         
       
       
         
       
     
    0
    0
    分享到:
    评论

    相关推荐

      ScalableIOJava(NIO如何实现Selector模式的).pdf

      NIO的核心组件是Selector,它是一个监管多个Channel的多路复用器,能够检测一个或多个Channel是否处于就绪状态,即检查Channel是否有I/O操作可执行。这种模式使得仅需要一个或几个线程就能管理多个Channel,大大降低...

      NIO的工作方式

      NIO的核心概念包括Channel(通道)、Selector(选择器)和Buffer(缓冲区)。Channel类似于BIO中的流,但更具体,可以类比为实际的交通方式,如汽车或高铁,它提供了读写数据的能力。Selector扮演调度者的角色,能够...

      NIO项目源码.zip

      通过分析这个NIO项目源码,你可以学习到如何在实际项目中应用NIO技术,理解其设计模式和优化策略,对于提升Java网络编程的能力大有裨益。深入研究NIO的细节,有助于你在开发高效、高并发的服务时做出明智的选择。

      基于JavaNIO的非阻塞通信的研究与实现

      NIO的主要特点是采用面向缓冲区、通道和选择器的模式,而不是传统的字节流方式。这种方式不仅提高了数据处理速度,而且通过非阻塞通信机制,使得单个线程能够管理更多的连接,极大地优化了资源利用效率。 #### Java...

      java解读NIOSocket非阻塞模式宣贯.pdf

      NIO的核心组件包括通道(Channels)、选择器(Selectors)和缓冲区(Buffers)。在Java NIO中,Socket通信可以采用非阻塞模式,允许一个线程处理多个客户端连接,这对于高并发的网络应用尤其有益。 在给定的示例中...

      Java NIO测试示例

      NIO与传统的 Blocking I/O(阻塞I/O)模式相比,最大的区别在于,NIO不会在等待数据时阻塞线程,而是采用回调或轮询的方式,这样可以更好地利用系统资源,提高并发处理能力。 NIO的核心组件包括通道(Channel)、...

      Java NIO学习资料+代码.zip

      Java NIO的核心组件主要包括通道(Channels)、缓冲区(Buffers)和选择器(Selectors): - **通道(Channels)**:通道是数据传输的途径,类似于传统IO中的流。Java NIO提供了多种通道,如FileChannel用于文件...

      大数据学习之旅——NIO源码

      此外,`java.nio.channels.Selector.select()`方法的实现也非常重要,它会阻塞直到至少有一个通道准备就绪,然后返回就绪通道的数量。 对于大数据处理,NIO的一个典型应用场景是高并发的网络服务器,例如基于NIO的...

      Reactor模式和NIO

      Reactor模式和NIO是两种在网络编程中广泛使用的并发处理技术。Reactor模式是一种事件驱动的设计模式,主要用于解决高并发场景下的系统设计问题,而Java的NIO(Non-blocking Input/Output,非阻塞I/O)是Java平台提供...

      Java NIO非阻塞服务端与客户端相互通信

      - **缓冲区(Buffers)**:NIO中的数据操作都在缓冲区上进行,这是NIO的主要特性之一,缓冲区提供了更高效的数据存取方式。 - **选择器(Selectors)**:选择器允许单个线程监视多个通道,当通道准备好进行读写...

      深入浅出NIO

      3. **Reactor模式**:在NIO中,Reactor模式是一种处理多个客户端连接的并发设计模式。它通过一个线程来监听和接受新的连接,然后分派事件到相应的处理器。相比于传统的多线程模型,Reactor模式可以更有效地管理大量...

      JAVA-NIO-DEMO

      - **选择器使用**:通过Selector监听多个通道的就绪状态,实现多路复用,提高程序并发性能。 - **非阻塞I/O**:展示如何使用非阻塞模式避免线程被长时间阻塞,提高系统资源利用率。 这个Demo将帮助你理解如何在实际...

      java nio 实现socket

      **NIO非阻塞模式**:相比之下,NIO采用了非阻塞模式,即当没有数据可读时,`read()`方法不会阻塞,而是立即返回。这意味着应用程序可以同时处理多个输入/输出操作,而不需要为每个操作分配一个独立的线程。这样的...

      nio demo for nio学习笔记(体系结构以及模块介绍)

      而NIO则采用了非阻塞模式,允许程序在等待数据准备就绪的过程中去做其他事情,提高了程序的效率和并发性。 **NIO的核心组件:** 1. **Channel(通道)**:Channel是数据传输的双向管道,可以同时进行读写操作。例如...

      nio4

      4. **多路复用器模式(Multiplexing)**:Java NIO的这种模式使得服务器可以处理成千上万个并发连接,而不必为每个连接创建单独的线程,避免了线程上下文切换的开销,提升了性能。在高并发场景下,如Web服务器、聊天...

      NIO.rar_NIO_java nio

      选择器通过调用`select()`方法来获取准备就绪的通道,并且可以选择性地处理这些通道。 4. **非阻塞I/O**: 在传统的BIO模型中,如果一个线程正在进行I/O操作时被阻塞,其他线程也无法进行任何操作。而NIO的非阻塞...

      java NIO详细教程

      Java NIO(New IO)是Java平台提供的一种新的IO操作模式,它首次出现在Java 1.4版本中,并在后续版本中不断完善。Java NIO 的设计目的是为了克服传统Java IO API在面对大量并发连接时存在的性能瓶颈。 ##### 使用...

      _NIO原理剖析 (1).pdf

      NIO的核心概念和机制包括阻塞和非阻塞、同步和异步以及IO模型和选择器 Selector。 首先,阻塞与非阻塞关注的是进程访问数据时,数据是否准备就绪的处理方式。在阻塞模式下,当请求的数据没有准备好时,进程会停下来...

      Java NIO实例

      而NIO则是采用非阻塞模式,当数据未准备好时,程序不会等待,而是继续执行其他任务,从而提高了程序的执行效率。 NIO的核心组件包括通道(Channel)、缓冲区(Buffer)和选择器(Selector)。接下来,我们将详细...

      java nio 读文件

      NIO的主要特点是面向缓冲区,非阻塞I/O,以及选择器,这些特性使得NIO在处理大量并发连接时表现出更高的效率。在本篇文章中,我们将深入探讨Java NIO如何读取文件。 一、NIO的基本概念 1. 缓冲区(Buffer):NIO的...

    Global site tag (gtag.js) - Google Analytics