论坛首页 Java企业应用论坛

JavaSE7新特性 异步非阻塞I/O 网络通信 AIO

浏览 9971 次
精华帖 (0) :: 良好帖 (2) :: 新手帖 (0) :: 隐藏帖 (0)
作者 正文
   发表时间:2009-08-13   最后修改:2009-08-14
Asynchronous I/O,异步I/O操作,以Proactor模式为原型设计.在nio中,当有事件发生时,我们会得到通知,然后再去相应的读和写,在aio中,当我们需要的事件完成时才会得到通知,之后可以直接进行业务处理.

Server端
package aio;

import java.net.InetSocketAddress;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class AioTcpServer implements Runnable {
	private AsynchronousChannelGroup asyncChannelGroup;//aio的核心之一通道组.由它负责处理事件,完成之后通知相应的handler
	private AsynchronousServerSocketChannel listener;//端口侦听器

	public AioTcpServer(int port) throws Exception {
		ExecutorService executor = Executors.newFixedThreadPool(20);
		asyncChannelGroup = AsynchronousChannelGroup.withThreadPool(executor);
		listener = AsynchronousServerSocketChannel.open(asyncChannelGroup).bind(new InetSocketAddress(port));
	}

	public void run() {
		try {
			Future<AsynchronousSocketChannel> future = listener.accept(listener, new AioAcceptHandler());
			future.get();//此步为阻塞方法,直到有连接上来为止.
		} catch (InterruptedException e) {
			e.printStackTrace();
		} catch (Exception e) {
			e.printStackTrace();
		} finally {

		}
	}

	public static void main(String... args) throws Exception {
		AioTcpServer server = new AioTcpServer(9998);
		new Thread(server).start();
	}
}


package aio;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

public class AioAcceptHandler implements CompletionHandler<AsynchronousSocketChannel, AsynchronousServerSocketChannel> {
	public void cancelled(AsynchronousServerSocketChannel attachment) {
		System.out.println("cancelled");
	}

	public void completed(AsynchronousSocketChannel socket, AsynchronousServerSocketChannel attachment) {
		try {
			attachment.accept(attachment, this);//此方法有点递归的意思.目的是继续侦听端口,由channelGroup负责执行.
			System.out.println("有客户端连接:" + socket.getRemoteAddress().toString());
			startRead(socket);
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

	public void failed(Throwable exc, AsynchronousServerSocketChannel attachment) {
		exc.printStackTrace();
	}

	public void startRead(AsynchronousSocketChannel socket) {
		ByteBuffer clientBuffer = ByteBuffer.allocate(1024);
		Future<Integer> future = socket.read(clientBuffer, clientBuffer, new AioReadHandler(socket));
		try {
			future.get();
		} catch (InterruptedException e) {
			e.printStackTrace();
		} catch (ExecutionException e) {
			e.printStackTrace();
		}
	}
}


package aio;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;

public class AioReadHandler implements CompletionHandler<Integer, ByteBuffer> {
	private AsynchronousSocketChannel socket;

	public AioReadHandler(AsynchronousSocketChannel socket) {
		this.socket = socket;
	}

	public void cancelled(ByteBuffer attachment) {
		System.out.println("cancelled");
	}

	private CharsetDecoder decoder = Charset.forName("GBK").newDecoder();

	public void completed(Integer i, ByteBuffer buf) {
		if (i > 0) {
			buf.flip();
			try {
				System.out.println("收到" + socket.getRemoteAddress().toString() + "的消息:" + decoder.decode(buf));
				buf.compact();
			} catch (CharacterCodingException e) {
				e.printStackTrace();
			} catch (IOException e) {
				e.printStackTrace();
			}
			socket.read(buf, buf, this);
		} else if (i == -1) {
			try {
				System.out.println("客户端断线:" + socket.getRemoteAddress().toString());
				buf = null;
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}

	public void failed(Throwable exc, ByteBuffer buf) {
		System.out.println(exc);
	}
}
   发表时间:2009-08-14   最后修改:2009-08-14
Client端
package aio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class AioTcpConnector {
	private AsynchronousChannelGroup asyncChannelGroup;
	private AsynchronousSocketChannel connector;

	public AioTcpConnector() throws Exception {
		ExecutorService executor = Executors.newFixedThreadPool(20);
		asyncChannelGroup = AsynchronousChannelGroup.withThreadPool(executor);
	}

	private final CharsetDecoder decoder = Charset.forName("GBK").newDecoder();

	public void start(final String ip, final int port) throws Exception {
		Timer timer = new Timer();
		timer.scheduleAtFixedRate(new TimerTask() {
			@Override
			public void run() {
				try {
					if (connector == null || !connector.isOpen()) {
						connector = AsynchronousSocketChannel.open(asyncChannelGroup);
						//connector.setOption(StandardSocketOption.TCP_NODELAY, true);
						//connector.setOption(StandardSocketOption.SO_REUSEADDR, true);
						//connector.setOption(StandardSocketOption.SO_KEEPALIVE, true);
						Future<Void> future = connector.connect(new InetSocketAddress(ip, port));
						try {
							future.get();
							handlerRead();
						} catch (InterruptedException e) {
							e.printStackTrace();
						} catch (ExecutionException e) {
							System.out.println("尝试连接失败!");
						}
					}
				} catch (IOException e) {
					e.printStackTrace();
				}
			}
		}, 1, 10 * 1000);
	}

	public void handlerRead() throws InterruptedException, ExecutionException {
		try {
			System.out.println("与服务器连接成功:" + connector.getRemoteAddress());
		} catch (IOException e1) {
			e1.printStackTrace();
		}
		final ByteBuffer buf = ByteBuffer.allocate(1024);
		Future<Integer> rows = connector.read(buf, buf, new CompletionHandler<Integer, ByteBuffer>() {
			public void cancelled(ByteBuffer attachment) {
				System.out.println("cancelled");
			}

			public void completed(Integer i, ByteBuffer in) {
				if (i > 0) {
					in.flip();
					try {
						System.out.println("收到" + connector.getRemoteAddress().toString() + "的消息:" + decoder.decode(in));
						in.compact();
					} catch (CharacterCodingException e) {
						e.printStackTrace();
					} catch (IOException e) {
						e.printStackTrace();
					}
					connector.read(in, in, this);
				} else if (i == -1) {
					try {
						System.out.println("与服务器连接断线:" + connector.getRemoteAddress());
						connector.close();
					} catch (IOException e) {
						e.printStackTrace();
					}
					in = null;
				} else if (i == 0) {
					System.out.println(i);
				}
			}

			public void failed(Throwable exc, ByteBuffer buf) {
				System.out.println(exc);
			}
		});
		rows.get();
	}

	public static void main(String... args) throws Exception {
		AioTcpConnector client = new AioTcpConnector();
		client.start("192.168.1.30", 9998);
	}
}
0 请登录后投票
   发表时间:2009-08-21  
7还不稳定。暂时不用。
0 请登录后投票
   发表时间:2009-08-21  
yidao620c 写道
7还不稳定。暂时不用。


装的挺像的啊!
0 请登录后投票
论坛首页 Java企业应用版

跳转论坛:
Global site tag (gtag.js) - Google Analytics