`
lg_asus
  • 浏览: 190940 次
  • 性别: Icon_minigender_1
  • 来自: 苏州
社区版块
存档分类
最新评论

java NIO中的Selector SelectableChannel SelectionKey

 
阅读更多
推荐参考:
http://rox-xmlrpc.sourceforge.net/niotut/index.html

下面是我写的一个小demo:

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.text.MessageFormat;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @author chega
 * 
 *         2012-12-3下午6:41:21
 * 
 * 为每个客户端分配一个线程来执行,执行完后客户端连接并不关闭(即长连接)
 */
public class MyServer
{
	Selector							selector;
	Map<SocketChannel, StringBuilder>	channelMap;
	CharsetDecoder						decoder		= Charset.forName("utf-8").newDecoder();
	ExecutorService						executors	= Executors.newCachedThreadPool();
	private MessageFormat				format		= new MessageFormat("{0, time, medium}, {1}");

	public static void main(String... args)
	{
		try
		{
			new MyServer().init();
		}
		catch (IOException e)
		{
			e.printStackTrace();
		}
	}

	private void init() throws IOException
	{
		this.channelMap = Collections.synchronizedMap(new HashMap<SocketChannel, StringBuilder>());

		selector = Selector.open();
		ServerSocketChannel channel = ServerSocketChannel.open();
		channel.configureBlocking(false);
		channel.socket().bind(new InetSocketAddress("localhost", 80));
		print("ServerSocket绑定在本机80端口,等待客户端连接");
		channel.register(selector, SelectionKey.OP_ACCEPT);
		while (true)
		{
			int count = this.selector.select();
			if (count == 0)
			{
				continue;
			}
			Iterator<SelectionKey> it = this.selector.selectedKeys().iterator();
			while (it.hasNext())
			{
				final SelectionKey key = it.next();
				it.remove();
				if (!key.isValid())
				{
					continue;
				}
				if (key.isAcceptable())
				{
					this.accept(key);
				}
				else if (key.isReadable())
				{
					this.read(key);
//					Server端用一个线程来处理一个客户端的请求,在read完之后立即向客户端写数据,因此不需要再注册SeletionKey.OP_WRITE
//					由于线程是在线程池中来运行的,因此一定要把当然key的SelectionKey.OP_READ事件给取消掉,否则会多次执行read方法
					key.interestOps(0);
				}
			}
		}
	}

	/**
	 * @param key
	 */
	private void accept(SelectionKey key)
	{
		try
		{
			SocketChannel c = ((ServerSocketChannel) key.channel()).accept();
			c.configureBlocking(false);
			c.register(selector, SelectionKey.OP_READ);
			this.print(c + "连接成功,并注册READ事件");
		}
		catch (IOException e)
		{
			e.printStackTrace();
		}
	}

	/**
	 * @param key
	 */
	private void read(final SelectionKey key)
	{
		this.executors.execute(new Runnable() {
			@Override
			public void run()
			{
				try
				{
					int num = -1;
					SocketChannel c = (SocketChannel) key.channel();
					ByteBuffer buffer = ByteBuffer.allocate(1024);
					while (c.isOpen())
					{
						num = c.read(buffer);
						if (num < 1)
						{
							break;
						}
						buffer.flip();
						MyServer.this.put2Map(c, buffer);
						buffer.clear();
					}
					if (num == -1)
					{
						c.close();
						key.channel();
						print(c + "has been closed");
						MyServer.this.channelMap.remove(c);
						return;
					}
					print("从" + c + "读取数据完毕");

					if (c.isOpen() && MyServer.this.channelMap.get(c) != null && MyServer.this.channelMap.get(c).length() != 0)
					{
						MyServer.this.channelMap.get(c).trimToSize();
						String str = "echo: " + MyServer.this.channelMap.get(c);
						c.write(ByteBuffer.wrap(str.getBytes()));
						print("向" + c + "写数据完毕");
						MyServer.this.put2Map(c, null);
//						由于Selector线程中已经取消了OP_READ事件,因此这里再加上OP_READ
						key.interestOps(SelectionKey.OP_READ);
						key.selector().wakeup();
					}
				}
				catch (IOException e)
				{
					e.printStackTrace();
					key.channel();
					try
					{
						key.channel().close();
					}
					catch (IOException e1)
					{
						e1.printStackTrace();
					}
				}
			}
		});
	}

	private void put2Map(SocketChannel channel, ByteBuffer buffer)
	{
		if (this.channelMap.get(channel) == null)
		{
			this.channelMap.put(channel, new StringBuilder());
		}
		if (buffer == null)
		{
			this.channelMap.get(channel).setLength(0);
			return;
		}
		try
		{
			this.channelMap.get(channel).append(decoder.decode(buffer).toString());
		}
		catch (CharacterCodingException e)
		{
			e.printStackTrace();
		}
	}

	private void print(String msg)
	{
		System.out.println(this.format.format(new Object[] { new Date(), msg }));
	}
}



import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.text.MessageFormat;
import java.util.Date;
import java.util.Iterator;

/**
 * chega
 * 2012-11-21下午3:19:52
 */

/**
 * @author chega
 * 
 *         2012-11-21下午3:19:52
 * 
 */
public class MyClient
{
	Selector				selector;
	InetSocketAddress		address;
	ByteBuffer				buffer;
	CharsetDecoder			decoder;
	private MessageFormat	format	= new MessageFormat("{0, time, medium}, {1}");
	final String			TEST	= "ntp";

	public static void main(String[] args) throws IOException
	{
		MyClient t = new MyClient();
		t.selector = Selector.open();
		t.address = new InetSocketAddress("localhost", 80);
		t.buffer = ByteBuffer.allocate(1024);
		t.decoder = Charset.forName("utf-8").newDecoder();
		t.run();
	}

	private void run() throws IOException
	{
		this.createNewConnection();
		while (true)
		{
			try
			{
				int selectedCount = selector.select();
				if (selectedCount == 0)
				{
					continue;
				}
				Iterator<SelectionKey> it = selector.selectedKeys().iterator();
				while (it.hasNext())
				{
					SelectionKey key = it.next();
					it.remove();// 从己选择键中删除,否则该键一直存在
					if (!key.isValid())
					{
						continue;
					}
					if (key.isConnectable())
					{
						this.connect(key);
					}
					else if (key.isReadable())
					{
						this.read(key);
					}
					else if (key.isWritable())
					{
						this.write(key);
					}
				}
			}
			catch (IOException e)
			{
				e.printStackTrace();
			}
		}
	}

	/**
	 * @param key
	 * @throws IOException
	 * 
	 */
	private void connect(SelectionKey key)
	{
		try
		{
			while (!((SocketChannel) key.channel()).finishConnect())
			{
				try
				{
					Thread.sleep(100);
				}
				catch (InterruptedException e)
				{
					e.printStackTrace();
				}
				continue;
			}
			key.interestOps(SelectionKey.OP_WRITE);
			print(key.channel() + "连接服务器成功,并注册WRITE事件");
		}
		catch (IOException e)
		{
			e.printStackTrace();
			print(key.channel() + "连接失败");
			System.out.println(((SocketChannel) key.channel()).socket().getLocalPort());
			System.exit(-1);
		}
	}

	/**
	 * @param key
	 */
	private void write(SelectionKey key)
	{
		try
		{
			// ((SocketChannel) key.channel()).write(ByteBuffer.wrap("GET / HTTP/1.0\r\n\r\n".getBytes()));
			((SocketChannel) key.channel()).write(ByteBuffer.wrap(TEST.getBytes()));
			key.interestOps(SelectionKey.OP_READ);
			print(key.channel() + "向服务器完数据完毕,并注册READ事件");
		}
		catch (IOException e)
		{
			e.printStackTrace();
			print(key.channel() + "写数据发生错误");
			try
			{
				key.channel().close();
			}
			catch (IOException e1)
			{
				e1.printStackTrace();
			}
		}
	}

	/**
	 * @param key
	 */
	private void read(SelectionKey key)
	{
		try
		{
			int num = -1;
			buffer.clear();
			while (key.channel().isOpen())
			{
				num = ((SocketChannel) key.channel()).read(buffer);
				if (num < 1)
				{
					break;
				}
				buffer.flip();
				String result = this.decoder.decode(buffer).toString();
				System.out.println(result);
				buffer.clear();
			}
//			sleep 5s and write again
			Thread.sleep(5000);
			key.interestOps(SelectionKey.OP_WRITE);
		}
		catch (IOException e)
		{
			e.printStackTrace();
			print(key.channel() + "读取数据发生错误");
			try
			{
				key.channel().close();
			}
			catch (IOException e1)
			{
				e1.printStackTrace();
			}
		}
		catch (InterruptedException e)
		{
			e.printStackTrace();
		}
	}

	private void createNewConnection() throws IOException
	{
		SocketChannel channel = SocketChannel.open();
		channel.configureBlocking(false);
		channel.connect(address);
		channel.register(selector, SelectionKey.OP_CONNECT);
	}

	private void print(String msg)
	{
		System.out.println(this.format.format(new Object[] { new Date(), msg }));
	}
}

分享到:
评论

相关推荐

    Java NIO——Selector机制解析三(源码分析)

    本文将深入探讨Java NIO中的Selector机制,并通过源码分析来理解其实现原理。 Selector机制是Java NIO中的核心组件,它允许单线程同时监控多个通道(Channels)的状态变化,例如连接就绪、数据可读或可写等。这种...

    Java_NIO-Selector.rar_java nio_selector

    Selector是Java NIO框架中的核心组件,它使得单个线程能够管理多个通道(Channels),从而提高系统资源利用率并优化性能。下面我们将详细探讨Java NIO中的Selector机制。 1. **Selector的作用** Selector的主要功能...

    java nio Selector的使用-客户端

    Selector是Java NIO中的核心组件之一,它允许单个线程处理多个通道(channels)的读写事件,极大地提高了服务器的并发能力。本篇文章将深入探讨如何在Java NIO中使用Selector处理客户端的I/O请求。 首先,我们需要...

    深入了解java NIO之Selector(选择器)

    Java NIO(Non-Blocking I/O)中,Selector(选择器)是一种重要的组件,它提供了选择执行已经就绪的任务的能力,使得多元 I/O 成为可能。在 Java 中,Selector 是一种抽象类,提供了选择就绪 Task 的能力。 ...

    java nio 实现socket

    **传统阻塞I/O模型**:在传统的Java IO编程中,当我们调用`read()`或`write()`方法时,如果当前没有数据可读或写,那么这些方法将会阻塞,直到有数据可用或者写操作完成。这种阻塞机制会导致大量的线程被占用,从而...

    java NIO详细教程

    - **Selector**:选择器是Java NIO中用于监听多个通道事件的关键组件。通过选择器,一个线程可以同时监听多个通道的事件,如数据可读或可写等,从而实现高效处理多个连接的能力。 #### 二、Channels and Buffers ...

    Java NIO Selector选择器简介.pdf

    **Selector** 是 Java NIO (New I/O) 框架中的一个重要组成部分,主要用于检测一个或多个 **NIO Channel** 的状态,包括但不限于可读、可写、可连接或可接收等状态。它的引入主要是为了克服传统 BIO (Blocking I/O) ...

    Java NIO Socket基本

    Java NIO(New Input/Output)是Java标准库中提供的一种I/O模型,与传统的 Blocking I/O(同步阻塞I/O)相对。NIO在Java 1.4版本引入,其设计目标是提供一种更高效、更灵活的I/O操作方式,特别适合处理大量并发连接...

    java nio Selector的使用-服务器端

    Selector是Java NIO中的核心组件,用于监听多个通道的事件,如连接建立、数据可读、写空等。在服务器端,Selector的应用尤为重要,因为它可以实现单线程处理多个客户端连接,从而提高系统的并发能力。 Selector的...

    Java NIO 中英文版 + Pro Java 7 NIO.2

    Java NIO,全称为Non-Blocking Input/Output(非阻塞输入/输出),是Java平台中用于高效处理I/O操作的重要框架。它在Java 1.4版本中被引入,替代了传统的IO模型,提供了更高级别的I/O操作机制,以适应并发编程的需求...

    Java NIO原理 图文分析及代码实现

    Java NIO(New Input/Output)是在JDK 1.4中引入的一种新的I/O处理方式,它支持非阻塞模式的文件和套接字操作。NIO的关键组件包括缓冲区(Buffers)、通道(Channels)、选择器(Selectors)等。 **Java NIO的主要...

    JavaNIO.zip_java nio_nio java

    Java NIO,全称为Non-Blocking Input/Output(非阻塞输入/输出),是Java从1.4版本开始引入的一个新特性,旨在提供一种更高效、更具选择性的I/O操作方式。相较于传统的IO模型,NIO的核心优势在于其非阻塞特性,允许...

    Java-NIO之Selector.doc

    Java NIO(非阻塞I/O)中的Selector是一个核心组件,它允许单个线程处理多个通道(channels)上的I/O事件。Selector的角色就像一个交通指挥员,能够监控多个通道并决定哪个通道准备好进行读写操作,从而提高系统的...

    JAVA nio的一个简单的例子

    在这个“JAVA nio的一个简单的例子”中,我们将探讨如何使用Java NIO进行简单的服务器-客户端通信,并计算字符串的哈希值。 在传统的BIO模型中,每个连接都需要一个线程来处理,当并发连接数量增加时,系统会创建...

    Java.NIO资源下载资源下载

    Java NIO 是 Java 平台的一个重要特性,首次出现在 Java 1.4 版本中。它为 Java 开发者提供了一套全新的 I/O 处理方式,相比于传统的 I/O API,新版本提供了更高效、更灵活的解决方案。本书《Java™ NIO》由 Ron ...

    【IT十八掌徐培成】Java基础第27天-03.NIO-Selector.zip

    Java NIO(New Input/Output)是...本教程“Java基础第27天-03.NIO-Selector.avi”应该会详细讲解这些概念,并通过实际示例展示如何在Java代码中使用Selector来处理I/O事件,帮助学习者深入理解并掌握这一关键技术。

    nio.rar_NIO_NIO-socket_java nio_java 实例_java.nio

    在Java NIO中,数据是以通道(Channels)和缓冲区(Buffers)的形式进行传输,而不是直接通过流。这种设计使得NIO能够同时处理多个输入/输出操作,从而实现多路复用。 标题“nio.rar_NIO_NIO-socket_java nio_java ...

    Java NIO原理解析

    1. **通道(Channel)**:在Java NIO中,通道代表了一个打开的I/O连接,如文件、套接字、网络流等。通道是双向的,数据可以从通道读取,也可以写入。与传统的流不同,通道可以在任何时候进行读写,而无需预先知道...

Global site tag (gtag.js) - Google Analytics