`

java7 NIO2(8)The Asynchronous Channel API 异步通道API

    博客分类:
  • java
 
阅读更多

 

异步channel API

主要引入三个异步类: AsynchronousFileChannel,AsynchronousSocketChannel, and AsynchronousServerSocketChannel.

AsynchronousFileChannel跟FileChannel区别:不保存全局的position和offset,可以制定访问位置,也支持并发访问文件不同。
AsynchronousServerSocketChannel AsynchronousSocketChannel:能够绑定到一个制定线程池的组中,这个线程池能够用future或者CompletionHandler来对执行结果进行处理,
AsynchronousChannelGroup:执行异步IO的java线程池的组类,
AsynchronousChannelGroup.java:
public static AsynchronousChannelGroup withFixedThreadPool(int nThreads, ThreadFactory threadFactory)
public static AsynchronousChannelGroup withCachedThreadPool(ExecutorService executor,int initialSize)
public static AsynchronousChannelGroup withThreadPool(ExecutorService executor)​​​
我们看使用示例
package com.mime;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.channels.FileLock;
import java.nio.charset.Charset;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;

public class NIO2AsynchronousFileChannel {
	public static void main(String[] args) {
		
		asyFile();
		asyFileChannel2();
		asyServerSocketChannel();
	}

	// 异步文件读写示例
	public static void asyFile() {
		ByteBuffer buffer = ByteBuffer.allocate(100);
		String encoding = System.getProperty("file.encoding");
		Path path = Paths.get("/tmp", "store.txt");
		try (AsynchronousFileChannel asynchronousFileChannel = AsynchronousFileChannel
				.open(path, StandardOpenOption.READ)) {
			Future<Integer> result = asynchronousFileChannel.read(buffer, 0);
			// 读超时控制
			// int count = result.get(100, TimeUnit.NANOSECONDS);

			while (!result.isDone()) {
				System.out.println("Do something else while reading ...");
			}
			System.out.println("Read done: " + result.isDone());
			System.out.println("Bytes read: " + result.get());

			// 使用CompletionHandler回调接口异步读取文件
			final Thread current = Thread.currentThread();
			asynchronousFileChannel.read(buffer, 0,
					"Read operation status ...",
					new CompletionHandler<Integer, Object>() {
						@Override
						public void completed(Integer result, Object attachment) {
							System.out.println(attachment);
							System.out.print("Read bytes: " + result);
							current.interrupt();
						}

						@Override
						public void failed(Throwable exc, Object attachment) {
							System.out.println(attachment);
							System.out.println("Error:" + exc);
							current.interrupt();
						}
					});

		} catch (Exception ex) {
			System.err.println(ex);
		}
		buffer.flip();
		System.out.print(Charset.forName(encoding).decode(buffer));
		buffer.clear();

		// 异步文件写示例
		ByteBuffer buffer1 = ByteBuffer
				.wrap("The win keeps Nadal at the top of the heap in men's"
						.getBytes());
		Path path1 = Paths.get("/tmp", "store.txt");
		try (AsynchronousFileChannel asynchronousFileChannel = AsynchronousFileChannel
				.open(path1, StandardOpenOption.WRITE)) {
			Future<Integer> result = asynchronousFileChannel
					.write(buffer1, 100);
			while (!result.isDone()) {
				System.out.println("Do something else while writing ...");
			}
			System.out.println("Written done: " + result.isDone());
			System.out.println("Bytes written: " + result.get());

			// file lock
			Future<FileLock> featureLock = asynchronousFileChannel.lock();
			System.out.println("Waiting for the file to be locked ...");
			FileLock lock = featureLock.get();
			if (lock.isValid()) {
				Future<Integer> featureWrite = asynchronousFileChannel.write(
						buffer, 0);
				System.out.println("Waiting for the bytes to be written ...");
				int written = featureWrite.get();
				// or, use shortcut
				// int written = asynchronousFileChannel.write(buffer,0).get();
				System.out.println("I’ve written " + written + " bytes into "
						+ path.getFileName() + " locked file!");
				lock.release();
			}

			// asynchronousFileChannel.lock("Lock operation status:", new
			// CompletionHandler<FileLock, Object>() ;

		} catch (Exception ex) {
			System.err.println(ex);
		}
	}

	// public static AsynchronousFileChannel open(Path file, Set<? extends
	// OpenOption> options,ExecutorService executor, FileAttribute<?>... attrs)
	// throws IOException
	private static Set withOptions() {
		final Set options = new TreeSet<>();
		options.add(StandardOpenOption.READ);
		return options;
	}

	// 使用AsynchronousFileChannel.open(path, withOptions(),
	// taskExecutor))这个API对异步文件IO的处理
	public static void asyFileChannel2() {
		final int THREADS = 5;
		ExecutorService taskExecutor = Executors.newFixedThreadPool(THREADS);
		String encoding = System.getProperty("file.encoding");
		List<Future<ByteBuffer>> list = new ArrayList<>();
		int sheeps = 0;
		Path path = Paths.get("/tmp",
				"store.txt");
		try (AsynchronousFileChannel asynchronousFileChannel = AsynchronousFileChannel
				.open(path, withOptions(), taskExecutor)) {
			for (int i = 0; i < 50; i++) {
				Callable<ByteBuffer> worker = new Callable<ByteBuffer>() {
					@Override
					public ByteBuffer call() throws Exception {
						ByteBuffer buffer = ByteBuffer
								.allocateDirect(ThreadLocalRandom.current()
										.nextInt(100, 200));
						asynchronousFileChannel.read(buffer, ThreadLocalRandom
								.current().nextInt(0, 100));
						return buffer;
					}
				};
				Future<ByteBuffer> future = taskExecutor.submit(worker);
				list.add(future);
			}
			// this will make the executor accept no new threads
			// and finish all existing threads in the queue
			taskExecutor.shutdown();
			// wait until all threads are finished
			while (!taskExecutor.isTerminated()) {
				// do something else while the buffers are prepared
				System.out
						.println("Counting sheep while filling up some buffers!So far I counted: "
								+ (sheeps += 1));
			}
			System.out.println("\nDone! Here are the buffers:\n");
			for (Future<ByteBuffer> future : list) {
				ByteBuffer buffer = future.get();
				System.out.println("\n\n" + buffer);
				System.out
						.println("______________________________________________________");
				buffer.flip();
				System.out.print(Charset.forName(encoding).decode(buffer));
				buffer.clear();
			}
		} catch (Exception ex) {
			System.err.println(ex);
		}
	}

	//异步server socket channel io处理示例
	public static void asyServerSocketChannel() {
		
		//使用threadGroup
//		AsynchronousChannelGroup threadGroup = null;
//		ExecutorService executorService = Executors
//		.newCachedThreadPool(Executors.defaultThreadFactory());
//		try {
//		threadGroup = AsynchronousChannelGroup.withCachedThreadPool(executorService, 1);
//		} catch (IOException ex) {
//		System.err.println(ex);
//		}
//		AsynchronousServerSocketChannel asynchronousServerSocketChannel =
//				AsynchronousServerSocketChannel.open(threadGroup);
		
		final int DEFAULT_PORT = 5555;
		final String IP = "127.0.0.1";
		ExecutorService taskExecutor = Executors.newCachedThreadPool(Executors
				.defaultThreadFactory());
		// create asynchronous server socket channel bound to the default group
		try (AsynchronousServerSocketChannel asynchronousServerSocketChannel = AsynchronousServerSocketChannel
				.open()) {
			if (asynchronousServerSocketChannel.isOpen()) {
				// set some options
				asynchronousServerSocketChannel.setOption(
						StandardSocketOptions.SO_RCVBUF, 4 * 1024);
				asynchronousServerSocketChannel.setOption(
						StandardSocketOptions.SO_REUSEADDR, true);
				// bind the server socket channel to local address
				asynchronousServerSocketChannel.bind(new InetSocketAddress(IP,
						DEFAULT_PORT));
				// display a waiting message while ... waiting clients
				System.out.println("Waiting for connections ...");
				while (true) {
					Future<AsynchronousSocketChannel> asynchronousSocketChannelFuture = asynchronousServerSocketChannel.accept();
					//使用CompletionHandler来处理IO事件
//					asynchronousServerSocketChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>() 
					//client使用CompletionHandler来处理IO事件
					//asynchronousSocketChannel.connect(new InetSocketAddress(IP, DEFAULT_PORT), null,new CompletionHandler<Void, Void>() 
					try {
						final AsynchronousSocketChannel asynchronousSocketChannel = asynchronousSocketChannelFuture
								.get();
						Callable<String> worker = new Callable<String>() {
							@Override
							public String call() throws Exception {
								String host = asynchronousSocketChannel
										.getRemoteAddress().toString();
								System.out.println("Incoming connection from: "
										+ host);
								final ByteBuffer buffer = ByteBuffer
										.allocateDirect(1024);
								// transmitting data
								while (asynchronousSocketChannel.read(buffer)
										.get() != -1) {
									buffer.flip();
								}
								asynchronousSocketChannel.write(buffer).get();
								if (buffer.hasRemaining()) {
									buffer.compact();
								} else {
									buffer.clear();
								}
								asynchronousSocketChannel.close();
								System.out.println(host
										+ " was successfully served!");
								return host;
							}
						};
						taskExecutor.submit(worker);
					} catch (InterruptedException | ExecutionException ex) {
						System.err.println(ex);
						System.err.println("\n Server is shutting down ...");
						// this will make the executor accept no new threads
						// and finish all existing threads in the queue
						taskExecutor.shutdown();
						// wait until all threads are finished
						while (!taskExecutor.isTerminated()) {
						}
						break;
					}
				}
			} else {
				System.out
						.println("The asynchronous server-socket channel cannot be opened!");
			}
		} catch (IOException ex) {
			System.err.println(ex);
		}

	}
}
 输出:
Do something else while reading ...
Do something else while reading ...
Do something else while reading ...
Do something else while reading ...
Do something else while reading ...
Do something else while reading ...
Do something else while reading ...
Do something else while reading ...
Do something else while reading ...
Do something else while reading ...
Do something else while reading ...
Do something else while reading ...
Do something else while reading ...
Do something else while reading ...
Do something else while reading ...
Do something else while reading ...
Do something else while reading ...
Do something else while reading ...
Do something else while reading ...
Do something else while reading ...
Do something else while reading ...
Do something else while reading ...
Do something else while reading ...
Do something else while reading ...
Do something else while reading ...
Do something else while reading ...
Do something else while reading ...
Do something else while reading ...
Do something else while reading ...
Do something else while reading ...
Do something else while reading ...
Do something else while reading ...
Do something else while reading ...
Do something else while reading ...
Do something else while reading ...
Do something else while reading ...
Do something else while reading ...
Do something else while reading ...
Do something else while reading ...
Do something else while reading ...
Do something else while reading ...
Do something else while reading ...
Do something else while reading ...
Do something else while reading ...
Do something else while reading ...
Do something else while reading ...
Do something else while reading ...
Do something else while reading ...
Do something else while reading ...
Do something else while reading ...
Do something else while reading ...
Do something else while reading ...
Do something else while reading ...
Do something else while reading ...
Do something else while reading ...
Do something else while reading ...
Do something else while reading ...
Do something else while reading ...
Do something else while reading ...
Do something else while reading ...
Do something else while reading ...
Do something else while reading ...
Do something else while reading ...
Do something else while reading ...
Do something else while reading ...
Do something else while reading ...
Do something else while reading ...
Do something else while reading ...
Do something else while reading ...
Do something else while reading ...
Do something else while reading ...
Do something else while reading ...
Do something else while reading ...
Do something else while reading ...
Do something else while reading ...
Do something else while reading ...
Do something else while reading ...
Do something else while reading ...
Do something else while reading ...
Do something else while reading ...
Do something else while reading ...
Do something else while reading ...
Do something else while reading ...
Do something else while reading ...
Do something else while reading ...
Do something else while reading ...
Do something else while reading ...
Do something else while reading ...
Do something else while reading ...
Do something else while reading ...
Do something else while reading ...
Read done: true
Bytes read: 18
hello,filechannel
Read operation status ...
Error:java.nio.channels.AsynchronousCloseException
Do something else while writing ...
Do something else while writing ...
Do something else while writing ...
Written done: true
Bytes written: 51
Waiting for the file to be locked ...
Waiting for the bytes to be written ...
I’ve written 100 bytes into store.txt locked file!
Counting sheep while filling up some buffers!So far I counted: 1
Counting sheep while filling up some buffers!So far I counted: 2
Counting sheep while filling up some buffers!So far I counted: 3
Counting sheep while filling up some buffers!So far I counted: 4
Counting sheep while filling up some buffers!So far I counted: 5
Counting sheep while filling up some buffers!So far I counted: 6
Counting sheep while filling up some buffers!So far I counted: 7
Counting sheep while filling up some buffers!So far I counted: 8
Counting sheep while filling up some buffers!So far I counted: 9
Counting sheep while filling up some buffers!So far I counted: 10
Counting sheep while filling up some buffers!So far I counted: 11

Done! Here are the buffers:

java.lang.InterruptedException
Waiting for connections ...
 
分享到:
评论
1 楼 chenhailong 2013-05-27  
恩,不错,不过现在JDK 7 好像没普及啊。
嗨 服务器端基本都不会用这样的

相关推荐

    Java.NIO资源下载资源下载

    本书《Java™ NIO》由 Ron Hitchens 编写,出版社为 O'Reilly,出版于2002年8月,ISBN号为0-596-00288-2,全书共有312页。 ### Java NIO 的优势 本书深入探讨了 Java 1.4 版本中的新 I/O 功能,并通过具体示例展示...

    NIO.2 入门,第 1 部分: 异步通道 API

    首先,异步通道(AsynchronousChannel)是NIO.2的核心概念,它提供了异步读写操作的能力。与传统的阻塞I/O相比,异步I/O允许我们在等待数据时执行其他任务,从而避免了线程的阻塞。AsynchronousSocketChannel和...

    Java-NIO2教程

    - **JDK1.7至今**: 随着Java 7的发布,NIO2(New I/O version 2)作为JSR-203的一部分被引入。NIO2提供了一套新的API,支持更为高效的异步I/O操作,并且引入了强大的文件系统API,包括`Path`、`Files`等类,使得文件...

    JAVA NIO 异步通信客户端

    2. **Selectors**: Selector是NIO中的多路复用器,它可以监控多个Channel,当它们准备就绪进行读写操作时,Selector会返回这些通道的键集。这样,单个线程就可以管理多个连接,提高了系统资源利用率。 3. **Buffers...

    java nio 异步编程源码

    1. **AsynchronousChannel**:异步通道,提供了异步读写操作。 2. **AsynchronousFileChannel**:用于异步文件操作。 3. **AsynchronousServerSocketChannel**:用于异步地接收连接请求。 4. **...

    用java的nio技术实现的异步连接池

    5. **Asynchronous Channel Group**:在Java NIO.2中,引入了异步通道组(AsynchronousChannelGroup),用于管理一组异步通道,可以控制它们的执行策略,比如限制并发连接的数量。 6. **CompletionHandler**:异步...

    Java IO, NIO and NIO.2

    Java NIO.2在JDK 7中引入,又称为JSR 203。NIO.2进一步改进了Java的I/O能力,主要包括以下三个方面: 1. 改进的文件系统接口(File API):引入了java.nio.file包,其中包括Path、Paths、Files类,提供了更为现代和...

    Java NIO测试示例

    Java NIO还引入了异步文件通道,支持异步读写文件。通过Future和CompletableFuture,可以方便地进行异步操作,并获取结果。 7. **字符集转换**: NIO提供了Charset类来处理字符编码和解码,支持多种字符集,如UTF...

    基于java的BIO、NIO、AIO通讯模型代码实现

    AIO,也称为NIO 2,是从Java 7开始引入的。与NIO不同,AIO是真正的异步I/O模型,它允许应用在发起读写操作后立即返回,而无需等待操作完成。操作系统会在数据准备好或写入完成时通过回调通知应用。这种模式进一步...

    Reilly__Java_NIO英文版和中文版

    10. **异步I/O(Asynchronous File Channel)**:Java 7引入,允许异步读写文件,进一步提升了I/O操作的效率。 通过阅读Reilly的《Java NIO》,开发者可以掌握如何设计和实现高效的并发I/O程序,解决传统阻塞I/O...

    java nio教程pdf

    Java NIO(New IO,也称为Non-Blocking IO)是一种基于通道(Channel)和缓冲区(Buffer)的I/O操作方法,用于替代标准Java IO API。Java NIO提供了与标准IO不同的I/O工作方式,它是面向缓冲区、基于通道的I/O操作,...

    JavaNIO想详解1

    NIO的核心概念包括通道(Channel)和缓冲区(Buffer)。 通道是NIO中数据传输的路径,它可以从一个源头读取数据,或者向一个目的地写入数据。Java NIO提供了多种类型的通道,如FileChannel用于文件操作,...

    java IO、NIO、AIO详解.docx

    Java AIO(Asynchronous I/O)是 Java 语言中最新的输入/输出机制,使用异步 IO 模式实现输入/输出操作。在 AIO 中,输入/输出操作是异步式的,即应用程序可以继续执行其他任务,而不需要等待输入/输出操作的完成。...

    Java通讯模型-BIO、NIO、AIO综合演练

    1. **概念**:AIO(也称为NIO 2)是Java 7引入的,进一步优化了非阻塞I/O,提供了异步读写能力,用户可以注册事件监听,当数据准备好时,系统会通知用户进行处理。 2. **优点**:真正实现了异步,避免了线程阻塞,...

    NIO网络通讯编程

    8. NIO 2.0(New I/O 2.0,Java 7引入):在Java 7中,NIO得到了进一步增强,增加了Asynchronous Channel Group和AsynchronousSocketChannel等异步I/O功能,使得开发者能够更加方便地编写高性能的并发程序。...

    Java 对象序列化 NIO NIO2详细介绍及解析

    NIO2是Java 7引入的进一步扩展,增加了文件系统查询、文件监听、异步I/O等功能。NIO2的主要改进包括: 1. **Path API**:提供了一种抽象表示文件路径的方式,使得跨平台操作更加方便。 2. **Files API**:提供了...

    NIO相关代码和文档资料

    6. **异步I/O(Asynchronous File Channel)**:Java NIO2引入了异步文件通道,支持异步读写文件,使得I/O操作可以在后台线程中完成,主线程无需等待。 7. **管道(Pipes)**:管道用于在同一JVM内的两个线程之间...

Global site tag (gtag.js) - Google Analytics