浏览 9974 次
精华帖 (0) :: 良好帖 (2) :: 新手帖 (0) :: 隐藏帖 (0)
|
|
---|---|
作者 | 正文 |
发表时间:2009-08-13
最后修改:2009-08-14
Server端 package aio; import java.net.InetSocketAddress; import java.nio.channels.AsynchronousChannelGroup; import java.nio.channels.AsynchronousServerSocketChannel; import java.nio.channels.AsynchronousSocketChannel; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class AioTcpServer implements Runnable { private AsynchronousChannelGroup asyncChannelGroup;//aio的核心之一通道组.由它负责处理事件,完成之后通知相应的handler private AsynchronousServerSocketChannel listener;//端口侦听器 public AioTcpServer(int port) throws Exception { ExecutorService executor = Executors.newFixedThreadPool(20); asyncChannelGroup = AsynchronousChannelGroup.withThreadPool(executor); listener = AsynchronousServerSocketChannel.open(asyncChannelGroup).bind(new InetSocketAddress(port)); } public void run() { try { Future<AsynchronousSocketChannel> future = listener.accept(listener, new AioAcceptHandler()); future.get();//此步为阻塞方法,直到有连接上来为止. } catch (InterruptedException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } finally { } } public static void main(String... args) throws Exception { AioTcpServer server = new AioTcpServer(9998); new Thread(server).start(); } } package aio; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousServerSocketChannel; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; public class AioAcceptHandler implements CompletionHandler<AsynchronousSocketChannel, AsynchronousServerSocketChannel> { public void cancelled(AsynchronousServerSocketChannel attachment) { System.out.println("cancelled"); } public void completed(AsynchronousSocketChannel socket, AsynchronousServerSocketChannel attachment) { try { attachment.accept(attachment, this);//此方法有点递归的意思.目的是继续侦听端口,由channelGroup负责执行. System.out.println("有客户端连接:" + socket.getRemoteAddress().toString()); startRead(socket); } catch (IOException e) { e.printStackTrace(); } } public void failed(Throwable exc, AsynchronousServerSocketChannel attachment) { exc.printStackTrace(); } public void startRead(AsynchronousSocketChannel socket) { ByteBuffer clientBuffer = ByteBuffer.allocate(1024); Future<Integer> future = socket.read(clientBuffer, clientBuffer, new AioReadHandler(socket)); try { future.get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } } package aio; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.nio.charset.CharacterCodingException; import java.nio.charset.Charset; import java.nio.charset.CharsetDecoder; public class AioReadHandler implements CompletionHandler<Integer, ByteBuffer> { private AsynchronousSocketChannel socket; public AioReadHandler(AsynchronousSocketChannel socket) { this.socket = socket; } public void cancelled(ByteBuffer attachment) { System.out.println("cancelled"); } private CharsetDecoder decoder = Charset.forName("GBK").newDecoder(); public void completed(Integer i, ByteBuffer buf) { if (i > 0) { buf.flip(); try { System.out.println("收到" + socket.getRemoteAddress().toString() + "的消息:" + decoder.decode(buf)); buf.compact(); } catch (CharacterCodingException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } socket.read(buf, buf, this); } else if (i == -1) { try { System.out.println("客户端断线:" + socket.getRemoteAddress().toString()); buf = null; } catch (IOException e) { e.printStackTrace(); } } } public void failed(Throwable exc, ByteBuffer buf) { System.out.println(exc); } } 声明:ITeye文章版权属于作者,受法律保护。没有作者书面许可不得转载。
推荐链接
|
|
返回顶楼 | |
发表时间:2009-08-14
最后修改:2009-08-14
Client端
package aio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousChannelGroup; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.nio.charset.CharacterCodingException; import java.nio.charset.Charset; import java.nio.charset.CharsetDecoder; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class AioTcpConnector { private AsynchronousChannelGroup asyncChannelGroup; private AsynchronousSocketChannel connector; public AioTcpConnector() throws Exception { ExecutorService executor = Executors.newFixedThreadPool(20); asyncChannelGroup = AsynchronousChannelGroup.withThreadPool(executor); } private final CharsetDecoder decoder = Charset.forName("GBK").newDecoder(); public void start(final String ip, final int port) throws Exception { Timer timer = new Timer(); timer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { try { if (connector == null || !connector.isOpen()) { connector = AsynchronousSocketChannel.open(asyncChannelGroup); //connector.setOption(StandardSocketOption.TCP_NODELAY, true); //connector.setOption(StandardSocketOption.SO_REUSEADDR, true); //connector.setOption(StandardSocketOption.SO_KEEPALIVE, true); Future<Void> future = connector.connect(new InetSocketAddress(ip, port)); try { future.get(); handlerRead(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { System.out.println("尝试连接失败!"); } } } catch (IOException e) { e.printStackTrace(); } } }, 1, 10 * 1000); } public void handlerRead() throws InterruptedException, ExecutionException { try { System.out.println("与服务器连接成功:" + connector.getRemoteAddress()); } catch (IOException e1) { e1.printStackTrace(); } final ByteBuffer buf = ByteBuffer.allocate(1024); Future<Integer> rows = connector.read(buf, buf, new CompletionHandler<Integer, ByteBuffer>() { public void cancelled(ByteBuffer attachment) { System.out.println("cancelled"); } public void completed(Integer i, ByteBuffer in) { if (i > 0) { in.flip(); try { System.out.println("收到" + connector.getRemoteAddress().toString() + "的消息:" + decoder.decode(in)); in.compact(); } catch (CharacterCodingException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } connector.read(in, in, this); } else if (i == -1) { try { System.out.println("与服务器连接断线:" + connector.getRemoteAddress()); connector.close(); } catch (IOException e) { e.printStackTrace(); } in = null; } else if (i == 0) { System.out.println(i); } } public void failed(Throwable exc, ByteBuffer buf) { System.out.println(exc); } }); rows.get(); } public static void main(String... args) throws Exception { AioTcpConnector client = new AioTcpConnector(); client.start("192.168.1.30", 9998); } } |
|
返回顶楼 | |
发表时间:2009-08-21
7还不稳定。暂时不用。
|
|
返回顶楼 | |
发表时间:2009-08-21
yidao620c 写道 7还不稳定。暂时不用。
装的挺像的啊! |
|
返回顶楼 | |