`
nod0620
  • 浏览: 20114 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

NIO基础

阅读更多

      tomcat集群的时候,在心跳通讯的时候,默认的接收器是NioReceiver,对NIO的使用是个比较经典的例子,在分析NIO之前,先看自己的一个NIO的小例子,代码:

 

public class HelloServer {
	private Selector selector;
	private ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
	private String name;

	public HelloServer() throws IOException {
		selector = Selector.open();
		ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
		serverSocketChannel.socket().bind(new InetSocketAddress(8888));
		serverSocketChannel.configureBlocking(false);
		serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

	}

	public Selector getSelector() {
		return selector;
	}

	public void setSelector(Selector selector) {
		this.selector = selector;
	}

	// 开始监听
	public void listen() {
		try {
			for (;;) {
				// 选择已经准备好的通道,返回通道数,这个值是上次调用select()后到现在这个阶段
                               //已经准备好的通道
				int i = selector.select();
				Iterator iter = selector.selectedKeys().iterator();
				if (iter.hasNext()) {
					SelectionKey selectionKey = (SelectionKey) iter.next();
					// 处理完一个就需要删除一个SelectionKey 不然的话,一直堆积,cpu100%
					iter.remove();
					process(selectionKey);
				}
			}
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

	public void process(SelectionKey selectionKey) throws IOException {
		// 准备好接受新的套接字连接 这个只有ServerSocketChannel绑定的SelectionKey才有效
		if (selectionKey.isAcceptable()) {
			ServerSocketChannel server = (ServerSocketChannel) selectionKey
					.channel();
			SocketChannel channel = server.accept();
			// 设置非阻塞模式
			channel.configureBlocking(false);
			// 得到client的socket后可以使其在OP_READ状态,下个时段的selector.select()中就是selectionKey.isReadable()的了,可以执行下面这个分支了
			channel.register(selector, SelectionKey.OP_READ);
			System.out.println("accept:"+selectionKey.interestOps()+" "+selectionKey.readyOps());
		} else if (selectionKey.isReadable()) {
			// 可读,下面就是client读数据
			SocketChannel channel = (SocketChannel) selectionKey.channel();
			// int count = 0;
			int count = channel.read(byteBuffer);
			if (count > 0) {
				byte[] bbb = new byte[1024];
				byteBuffer.flip();
				bbb = byteBuffer.array();
				name = new String(bbb);
				System.out.println(name + "aaa");
				byteBuffer.clear();
			}
			// channel.close();这个可以关闭,可以不关闭,一般需要关闭(count==0),不然client的socket堆积的太多
			// channel.register(selector, SelectionKey.OP_WRITE);
			// 可以在注册感兴趣的时间,这个是往客户端写,下个select()
			// 时段就可以往client里面写了
			System.out.println("isReadable:"+selectionKey.interestOps()+" "+selectionKey.readyOps());
		} else if (selectionKey.isWritable()) {
			System.out.println("isWritable:"+selectionKey.interestOps());
			SocketChannel channel = (SocketChannel) selectionKey.channel();
			byteBuffer.reset();
			byteBuffer.put(name == null ? "acb".getBytes() : name.getBytes());
		}
	}
	public static void main(String[] args) {
		int port = 8888;
		try {
			HelloServer server = new HelloServer();
			System.out.println("listening on " + port);
			server.listen();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
}

 

 

public class HelloClient {
	static InetSocketAddress ip = new InetSocketAddress("localhost", 8888);
	static ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
	static class Message implements Runnable {
		protected String name;
		String msg = "";
		public Message(String index) {
			this.name = index;
		}
		public void run() {
			try {
				long start = System.currentTimeMillis();
				// 打开Socket通道
				SocketChannel client = SocketChannel.open();
				// 设置为非阻塞模式
				client.configureBlocking(false);
				// 打开选择器
				Selector selector = Selector.open();
				// 注册连接服务端socket动作
				client.register(selector, SelectionKey.OP_CONNECT);
				// 连接
				client.connect(ip);
				// 分配内存
				ByteBuffer buffer = ByteBuffer.allocate(1);
				int total = 0;

				_FOR: for (;;) {
					selector.select();
					Iterator iter = selector.selectedKeys().iterator();
					while (iter.hasNext()) {
						SelectionKey key = (SelectionKey) iter.next();
						iter.remove();
						if (key.isConnectable()) {
							SocketChannel channel = (SocketChannel) key
									.channel();
							if (channel.isConnectionPending())
								channel.finishConnect();
							
							byte [] aaa = name.getBytes();
							channel.write(buffer.wrap(aaa));
							
							//写完可以监听读,server的响应就可以在下个select()时段检测到了
							channel.register(selector, SelectionKey.OP_READ);
						} else if (key.isReadable()) {
							SocketChannel channel = (SocketChannel) key
									.channel();
							int count = channel.read(buffer);
							if (count > 0) {
								total += count;
								buffer.flip();

								while (buffer.remaining() > 0) {
									byte b = buffer.get();
									msg += (char) b;

								}
								buffer.clear();
							} else {
								client.close();
								break _FOR;
							}
						}
					}
				}
				double last = (System.currentTimeMillis() - start) * 1.0 / 1000;
				System.out.println(msg + "used time :" + last + "s.");
				msg = "";
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}

	public static void main(String[] args) throws IOException {
		String names[] = new String[10];
		for (int index = 0; index < 10; index++) {
			names[index] = "jeff[" + index + "]";
			new Thread(new Message(names[index])).start();
		}
	}
}

   基本代码解释都有,需要注意的是:

  1  selector的selectedKeys()方法返回的是上次select()方法调用后到现在为止的放入到就绪集里面的SelectionKey

      ,这个跟SelectionKey的interest和ready集合是没有关系的

  2.用完的SelectionKey必须从就绪集里面删除,不然的话会有堆积的

  3.在调用SelectableChannel.register方法时,就是channel和selector进行关联的时候,此时SelectionKey的interest和ready集合是一致的

 

当调用selector的select()方法,查看api知道需要做的事情如下:

 

   1. 将已取消键集中的每个键从所有键集中移除(如果该键是键集的成员),并注销其通道。此步骤使已取消键集成为空集。 

   2. 在开始进行选择操作时,应查询基础操作系统来更新每个剩余通道的准备就绪信息,
以执行由其键的相关集合所标识的任意操作。对于已为至少一个这样的操作准备就绪的通道,
执行以下两种操作之一: 

如果该通道的键尚未在已选择键集中,则将其添加到该集合中,并修改其准备就绪操作集,
以准确地标识那些通道现在已报告为之准备就绪的操作。丢弃准备就绪操作集中以前记录的所有准备就绪信息。 

如果该通道的键已经在已选择键集中,则修改其准备就绪操作集,
以准确地标识所有通道已报告为之准备就绪的新操作。保留准备就绪操作集以前记录的所有准备就绪信息;
换句话说,基础系统所返回的准备就绪操作集是和该键的当前准备就绪操作集按位分开 (bitwise-disjoined) 的。 

 3.如果在此步骤开始时键集中的所有键都有空的相关集合,则不会更新已选择键集和任意键的准备就绪操作集。 
如果在步骤 (2) 的执行过程中要将任意键添加到已取消键集中,则处理过程如步骤 (1)。 

  对应步骤2我的理解是:对应一个selectionKey,如果就绪集合里面有的话,只是把这个key的状态和集合里面这个key已有的状态或操作,集合没有的话就直接set进去.

 下面看NioReceiver的代码:

 NioReceiver实现了Runnable接口,肯定会在线程中被执行,我们看他的run()方法:

    public void run() {
        try {
            listen();
        } catch (Exception x) {
            log.error("Unable to run replication listener.", x);
        }
    }
 

   调用了listen()方法,看代码:

 

 

    protected void listen() throws Exception {
        if (doListen()) {
            log.warn("ServerSocketChannel already started");
            return;
        }
        
        setListen(true);

        while (doListen() && selector != null) {
            // this may block for a long time, upon return the
            // selected set contains keys of the ready channels
            try {
                events();
                socketTimeouts();
                int n = selector.select(getTcpSelectorTimeout());
                if (n == 0) {

                    continue; // nothing to do
                }
                // get an iterator over the set of selected keys
                Iterator it = selector.selectedKeys().iterator();
                // look at each key in the selected set
                while (it.hasNext()) {
                    SelectionKey key = (SelectionKey) it.next();
                    // Is a new connection coming in?
                    if (key.isAcceptable()) {
                        ServerSocketChannel server = (ServerSocketChannel) key.channel();
                        SocketChannel channel = server.accept();
                        channel.socket().setReceiveBufferSize(getRxBufSize());
                        channel.socket().setSendBufferSize(getTxBufSize());
                        channel.socket().setTcpNoDelay(getTcpNoDelay());
                        channel.socket().setKeepAlive(getSoKeepAlive());
                        channel.socket().setOOBInline(getOoBInline());
                        channel.socket().setReuseAddress(getSoReuseAddress());
                        channel.socket().setSoLinger(getSoLingerOn(),getSoLingerTime());
                        channel.socket().setTrafficClass(getSoTrafficClass());
                        channel.socket().setSoTimeout(getTimeout());
                        Object attach = new ObjectReader(channel);
                        registerChannel(selector,
                                        channel,
                                        SelectionKey.OP_READ,
                                        attach);
                    }
                    // is there data to read on this channel?
                    if (key.isReadable()) {
                        readDataFromSocket(key);
                    } else {
                        key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
                    }

                    // remove key from selected set, it's been handled
                    it.remove();
                }
            } catch (java.nio.channels.ClosedSelectorException cse) {
                // ignore is normal at shutdown or stop listen socket
            } catch (java.nio.channels.CancelledKeyException nx) {
                log.warn("Replication client disconnected, error when polling key. Ignoring client.");
            } catch (Throwable x) {
                try {
                    log.error("Unable to process request in NioReceiver", x);
                }catch ( Throwable tx ) {
                    //in case an out of memory error, will affect the logging framework as well
                    tx.printStackTrace();
                }
            }

        }
        serverChannel.close();
        if (selector != null)
            selector.close();
    }

   前面是设置了一些标志位,接下来调用了events()方法,执行NioReplicationTask的里面的一些任务的一个列表。

  每一个event都是Runnable,比较怪的是在events()方法中,执行的是runnable的run()的方法,也不是线程的执行方 法.

  接着是socketTimeouts()方法调用,关闭一些已经超时的channel,接着就是根据SelectionKey的ready集合进行操作,包括注册channel,读channel的数据,之后如果NioReceiver不需要监听,进行一些资源的关闭和回收

 看下readDataFromSocket()方法的定义:

 

    protected void readDataFromSocket(SelectionKey key) throws Exception {
        NioReplicationTask task = (NioReplicationTask) getTaskPool().getRxTask();
        if (task == null) {
            // No threads/tasks available, do nothing, the selection
            // loop will keep calling this method until a
            // thread becomes available, the thread pool itself has a waiting mechanism
            // so we will not wait here.
            if (log.isDebugEnabled()) log.debug("No TcpReplicationThread available");
        } else {
            // invoking this wakes up the worker thread then returns
            //add task to thread pool
            task.serviceChannel(key);
            getExecutor().execute(task);
        }
    }
   

   首先从一个资源池中拿到NioReplicationTask,有可能资源池没有空闲资源,此时返回null,那么这次读就丢失了,不过没有关系,readDataFromSocket()方法是在一个循环中,会一直调用这个方法,只是这次的数据丢失了,如果返回不为空,则执行这个Task.

 

    // loop forever waiting for work to do
    public synchronized void run() { 
        if ( buffer == null ) {
            if ( (getOptions() & OPTION_DIRECT_BUFFER) == OPTION_DIRECT_BUFFER) {
                buffer = ByteBuffer.allocateDirect(getRxBufSize());
            } else {
                buffer = ByteBuffer.allocate(getRxBufSize());
            }
        } else {
            buffer.clear();
        }
        if (key == null) {
            return;	// just in case
        }
        if ( log.isTraceEnabled() ) 
            log.trace("Servicing key:"+key);

        try {
            ObjectReader reader = (ObjectReader)key.attachment();
            
            
            //如果SelectionKey上面没有reader的话,这个SelectionKey可以取消此键的通道到其选择器的注册
            //以及关闭SelectionKey的channel和socket
            if ( reader == null ) {
                if ( log.isTraceEnabled() ) 
                    log.trace("No object reader, cancelling:"+key);
                cancelKey(key);
            } else {
                if ( log.isTraceEnabled() ) 
                    log.trace("Draining channel:"+key);

                drainChannel(key, reader);
            }
        } catch (Exception e) {
            //this is common, since the sockets on the other
            //end expire after a certain time.
            if ( e instanceof CancelledKeyException ) {
                //do nothing
            } else if ( e instanceof IOException ) {
                //dont spew out stack traces for IO exceptions unless debug is enabled.
                if (log.isDebugEnabled()) log.debug ("IOException in replication worker, unable to drain channel. Probable cause: Keep alive socket closed["+e.getMessage()+"].", e);
                else log.warn ("IOException in replication worker, unable to drain channel. Probable cause: Keep alive socket closed["+e.getMessage()+"].");
            } else if ( log.isErrorEnabled() ) {
                //this is a real error, log it.
                log.error("Exception caught in TcpReplicationThread.drainChannel.",e);
            } 
            cancelKey(key);
        } finally {

        }
        key = null;
        // done, ready for more, return to pool
        getTaskPool().returnWorker (this);
    }
 

   这个Task里面,最主要的是调用drainChannel()方法,

    protected void drainChannel (final SelectionKey key, ObjectReader reader) throws Exception {
    	
    	//从SelectionKey中得到数据
        reader.setLastAccess(System.currentTimeMillis());
        reader.access();
        SocketChannel channel = (SocketChannel) key.channel();
        int count;
        buffer.clear();			// make buffer empty

        // loop while data available, channel is non-blocking
        while ((count = channel.read (buffer)) > 0) {
            buffer.flip();		// make buffer readable
            if ( buffer.hasArray() ) 
                reader.append(buffer.array(),0,count,false);
            else 
                reader.append(buffer,count,false);
            buffer.clear();		// make buffer empty
            //do we have at least one package?
            if ( reader.hasPackage() ) break;
        }

        int pkgcnt = reader.count();
        
        if (count < 0 && pkgcnt == 0 ) {
            //end of stream, and no more packages to process
            remoteEof(key);
            return;
        }
        
        ChannelMessage[] msgs = pkgcnt == 0? ChannelData.EMPTY_DATA_ARRAY : reader.execute();
        
        registerForRead(key,reader);//register to read new data, before we send it off to avoid dead locks
        
        for ( int i=0; i<msgs.length; i++ ) {
            /**
             * Use send ack here if you want to ack the request to the remote 
             * server before completing the request
             * This is considered an asynchronized request
             */
            if (ChannelData.sendAckAsync(msgs[i].getOptions())) sendAck(key,channel,Constants.ACK_COMMAND);
            try {
                if ( Logs.MESSAGES.isTraceEnabled() ) {
                    try {
                        Logs.MESSAGES.trace("NioReplicationThread - Received msg:" + new UniqueId(msgs[i].getUniqueId()) + " at " + new java.sql.Timestamp(System.currentTimeMillis()));
                    }catch ( Throwable t ) {}
                }
                //process the message  ReceiverBase.messageDataReceived()
                getCallback().messageDataReceived(msgs[i]);
                /**
                 * Use send ack here if you want the request to complete on this 
                 * server before sending the ack to the remote server
                 * This is considered a synchronized request
                 */
                if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(key,channel,Constants.ACK_COMMAND);
            }catch ( RemoteProcessException e ) {
                if ( log.isDebugEnabled() ) log.error("Processing of cluster message failed.",e);
                if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(key,channel,Constants.FAIL_ACK_COMMAND);
            }catch ( Exception e ) {
                log.error("Processing of cluster message failed.",e);
                if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(key,channel,Constants.FAIL_ACK_COMMAND);
            }
            if ( getUseBufferPool() ) {
                BufferPool.getBufferPool().returnBuffer(msgs[i].getMessage());
                msgs[i].setMessage(null);
            }
        }                        
        
        if (count < 0) {
            remoteEof(key);
            return;
        }
    }
 

    比较经典的是:在这个Task执行前,传入的SelectionKey的OP_READ事件被屏蔽,在这个方法中数据读完后,重新开时这个事件,这里不是很明白,暂且认为,这样子SelectionKey带的数据不会被下一次的数据所污染,因为下一次通道发的数据在下一次select的时候是不会在SelectionKey的interest集合里面的,也就不可能在ready集合里面了。

最后面只是发送一个ACK的应答数据.

 

 

相比自己写的例子和tomcat的例子,差距在细节,包括异常的处理,各种临界状态的处理,线程的同步和阻塞唤醒。写一个主体常常很容易,但是各个细节的考虑却是功力的体现

 

 

 

 

 

分享到:
评论

相关推荐

    【IT十八掌徐培成】Java基础第26天-04.NIO基础.zip

    在这个课程中,IT十八掌徐培成老师将深入讲解Java NIO的基础知识。 NIO的核心组件主要包括通道(Channel)、缓冲区(Buffer)和选择器(Selector)。首先,通道是数据传输的路径,它可以读写数据,比如文件通道、...

    Java springboot 整合mina 框架,nio通讯基础教程,mina框架基础教程.zip

    在这个压缩包中,包含了“Java_NIO基础视频教程”和“MINA视频教程”,你可以通过观看这些教程,了解和学习NIO和Mina的基本用法。同时,“Netty快速入门视频(52im.net).txt”虽然不是直接与Mina相关的,但Netty也是...

    Java NIO基础视频教程、MINA视频教程、Netty快速入门视频 [有源码]-附件资源

    Java NIO基础视频教程、MINA视频教程、Netty快速入门视频 [有源码]-附件资源

    httpcore-nio-4.3.jar包

    2. **Java NIO基础** Java NIO(New IO)是Java 1.4引入的一套新的I/O API,替代了传统的BIO(Blocking IO)。NIO的核心概念包括通道(Channel)、缓冲区(Buffer)和选择器(Selector)。通道可以读写数据,缓冲区...

    NIO与零拷贝_javanio_nio和零拷贝_

    1. **NIO基础** - **通道(Channels)**:通道类似于流,但它是双向的,可以读也可以写。常见的通道有FileChannel、SocketChannel、ServerSocketChannel等。 - **缓冲区(Buffers)**:NIO的核心组件,用于存储...

    NIO项目源码.zip

    1. NIO基础概念 - Channel(通道):NIO的核心组件,它代表了数据传输的路径,如文件、套接字等。通道是双向的,可以同时进行读写操作。 - Buffer(缓冲区):在NIO中,数据总是先被写入Buffer,然后从Buffer读取...

    《NIO与Socket编程技术指南》_高洪岩

    1. NIO基础:介绍NIO的基本概念,如通道、缓冲区、选择器以及它们之间的交互。 2. 文件操作:讲解如何使用NIO进行文件的读写操作,包括文件通道和MappedByteBuffer的使用。 3. 非阻塞I/O:对比NIO和BIO,解释非阻塞I...

    NIO实现邮件接收原码

    1. **NIO基础** - **通道**:通道是数据传输的路径,它可以读取或写入数据,比如SocketChannel、FileChannel等。 - **缓冲区**:缓冲区是NIO的核心,它是数据的容器,提供了读写数据的方法,并且有自动管理内存的...

    java NIO技巧及原理

    **Java NIO基础概念:** 1. **通道(Channel)**:类似于流,但可以双向传输数据,如FileChannel、SocketChannel等。 2. **缓冲区(Buffer)**:用于在通道和应用程序之间存储数据,提供了更高效的访问方式。 3. **...

    java NIO推送实例

    1. **Java NIO基础** - **通道(Channels)**:Java NIO 提供了多种通道,如文件通道、套接字通道等,它们代表不同类型的I/O操作。 - **缓冲区(Buffers)**:数据在通道和应用程序之间传输时会存储在缓冲区中,...

    NIO trick and trap NIO网络

    #### NIO基础概览 - **变迁历程**:NIO的概念最早由JSR 51提出,在JDK 1.4中首次引入。随后随着JSR 203的发展,NIO 2.0在JDK 7中正式发布。 - **核心组件**:主要包括Buffers(缓冲区)、Channels(通道)和...

    java nio im(server+client)

    1. **NIO基础概念** - **通道(Channel)**:在NIO中,数据是通过通道进行传输的。通道类似于流,但可以同时进行读写操作。 - **缓冲区(Buffer)**:数据在传输前会先存储在缓冲区中,缓冲区提供了一种方式来管理...

    java-nio.rar_NIO_java nio

    1. **NIO基础** - **通道(Channels)**:NIO中的通道类似于流,但它们是双向的,可以读写数据。常见的通道类有FileChannel、SocketChannel和ServerSocketChannel等。 - **缓冲区(Buffers)**:缓冲区是数据的...

    Java NIO非阻塞服务端与客户端相互通信

    1. **Java NIO基础** - **通道(Channels)**:NIO中的通道类似于传统IO的流,但它们可以同时读写,并且支持非阻塞操作。 - **缓冲区(Buffers)**:NIO中的数据操作都在缓冲区上进行,这是NIO的主要特性之一,...

    java nio 聊天室源码

    1. **Java NIO基础** - **通道(Channel)**:在NIO中,数据是通过通道进行传输的,如SocketChannel、ServerSocketChannel、FileChannel等。它们是双向的,可以读也可以写。 - **缓冲区(Buffer)**:NIO的核心组件,...

    nio入门文档及示例代码

    一、NIO基础概念 1. **通道(Channels)**:NIO的核心组件之一,它是连接到数据源(如文件、套接字)的通道,可以读写数据。常见的通道类有FileChannel、SocketChannel、ServerSocketChannel等。 2. **缓冲区...

    基于NIO的群聊.zip

    1. **NIO基础概念**: - **通道(Channels)**:NIO的核心组件之一,它提供了数据传输的路径。Java NIO提供多种类型的通道,如SocketChannel、ServerSocketChannel、FileChannel等。 - **缓冲区(Buffers)**:...

    NIO按行读取数据

    1. **NIO基础知识**: - **FileInputStream**:这是Java IO中的类,用于读取文件内容。 - **FileChannel**:NIO中的核心组件,允许通过通道进行高效的数据传输。 - **MappedByteBuffer**:文件映射缓冲区,它允许...

Global site tag (gtag.js) - Google Analytics