`
shihuan830619
  • 浏览: 582978 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

JDK1.7 AIO

    博客分类:
  • J2SE
阅读更多
工程结构图:


Helper.java文件内容如下:
package com.shihuan.aio.util;

import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

public class Helper {

	private static BlockingQueue<String> words;
    private static Random random;
     
    public Helper() throws InterruptedException{
        words = new ArrayBlockingQueue<String>(5);
        words.put("hi");
        words.put("who");
        words.put("what");
        words.put("where");
        words.put("bye");  
         
        random = new Random();
    }
     
    public String getWord(){
        return words.poll();
    }
 
    public void sleep() {
        try {
            TimeUnit.SECONDS.sleep(random.nextInt(3));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }  
     
    public static void sleep(long l) {
        try {
            TimeUnit.SECONDS.sleep(l);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
     
    public static String getAnswer(String question){
        String answer = null;
         
        switch(question){
        case "who":
            answer = "我是小娜\n";
            break;
        case "what":
            answer = "我是来帮你解闷的\n";
            break;
        case "where":
            answer = "我来自外太空\n";
            break;
        case "hi":
            answer = "hello\n";
            break;
        case "bye":
            answer = "88\n";
            break;
        default:
                answer = "请输入 who, 或者what, 或者where";
        }
         
        return answer;
    }
	
}


CharsetHelper.java文件内容如下:
package com.shihuan.aio.util;

import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CharsetEncoder;

public class CharsetHelper {

	private static final String UTF_8 = "UTF-8";
    private static CharsetEncoder encoder = Charset.forName(UTF_8).newEncoder();
    private static CharsetDecoder decoder = Charset.forName(UTF_8).newDecoder();
     
    public static ByteBuffer encode(CharBuffer in) throws CharacterCodingException {
        return encoder.encode(in);
    }
 
    public static CharBuffer decode(ByteBuffer in) throws CharacterCodingException {
        return decoder.decode(in);
    }
	
}


SocketServer.java文件内容如下:
package com.shihuan.aio.server;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.Executors;

import com.shihuan.aio.util.Helper;

public class SocketServer {

	private final AsynchronousServerSocketChannel server;
	//写队列,因为当前一个异步写调用还没完成之前,调用异步写会抛WritePendingException
    //所以需要一个写队列来缓存要写入的数据,这是AIO比较坑的地方
    private final Queue<ByteBuffer> queue = new LinkedList<ByteBuffer>();
    private boolean writing = false;
	
	public SocketServer() throws IOException {
		//设置线程数为CPU核数
        AsynchronousChannelGroup channelGroup = AsynchronousChannelGroup.withFixedThreadPool(Runtime.getRuntime().availableProcessors(), Executors.defaultThreadFactory());
        server = AsynchronousServerSocketChannel.open(channelGroup);
        //重用端口
        server.setOption(StandardSocketOptions.SO_REUSEADDR, true);
        //绑定端口并设置连接请求队列长度
        server.bind(new InetSocketAddress(8383), 80);
	}

	public void listen() {
        System.out.println(Thread.currentThread().getName() + ": run in listen method" );
        //开始接受第一个连接请求
        server.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>(){                       
            @Override
            public void completed(AsynchronousSocketChannel channel, Object attachment) {
                System.out.println(Thread.currentThread().getName() + ": run in accept completed method" );
                 
                //先安排处理下一个连接请求,异步非阻塞调用,所以不用担心挂住了
                //这里传入this是个地雷,小心多线程
                server.accept(null, this);
                //处理连接读写
                handle(channel);
            }
 
            private void handle(final AsynchronousSocketChannel channel) {
                System.out.println(Thread.currentThread().getName() + ": run in handle method" );
                //每个AsynchronousSocketChannel,分配一个缓冲区
                final ByteBuffer readBuffer = ByteBuffer.allocateDirect(1024);
                readBuffer.clear();
                channel.read(readBuffer, null, new CompletionHandler<Integer, Object>(){
 
                    @Override
                    public void completed(Integer count, Object attachment) {
                        System.out.println(Thread.currentThread().getName() + ": run in read completed method" );  
                         
                        if(count > 0){
                            try{
                                readBuffer.flip();
                                //CharBuffer charBuffer = CharsetHelper.decode(readBuffer); 
                                CharBuffer charBuffer = Charset.forName("UTF-8").newDecoder().decode(readBuffer);
                                String question = charBuffer.toString(); 
                                String answer = Helper.getAnswer(question);
                                /*//写入也是异步调用,也可以使用传入CompletionHandler对象的方式来处理写入结果
                                //channel.write(CharsetHelper.encode(CharBuffer.wrap(answer)));                            
                                try{
                                    channel.write(Charset.forName("UTF-8").newEncoder().encode(CharBuffer.wrap(answer)));
                                }
                                //Unchecked exception thrown when an attempt is made to write to an asynchronous socket channel and a previous write has not completed.
                                //看来操作系统也不可靠
                                catch(WritePendingException wpe){
                                    //休息一秒再重试,如果失败就不管了
                                    Helper.sleep(1);
                                    channel.write(Charset.forName("UTF-8").newEncoder().encode(CharBuffer.wrap(answer)));
                                }*/
                                writeStringMessage(channel, answer);
                                 
                                readBuffer.clear();
                            }
                            catch(IOException e){
                                e.printStackTrace();
                            }
                        }
                        else{
                            try {
                                //如果客户端关闭socket,那么服务器也需要关闭,否则浪费CPU
                                channel.close();
                            } catch (IOException e) {
                                e.printStackTrace();
                            }
                        }
                         
                        //异步调用OS处理下个读取请求
                        //这里传入this是个地雷,小心多线程
                        channel.read(readBuffer, null, this);
                    }
 
                    /**
                     * 服务器读失败处理
                     * @param exc
                     * @param attachment
                     */
                    @Override
                    public void failed(Throwable exc, Object attachment) {
                        System.out.println("server read failed: " + exc);         
                        if(channel != null){
                            try {
                                channel.close();
                            } catch (IOException e) {
                                e.printStackTrace();
                            }
                        }
                    }
                     
                });                            
            }
 
            /**
             * 服务器接受连接失败处理
             * @param exc
             * @param attachment
             */
            @Override
            public void failed(Throwable exc, Object attachment) {
                System.out.println("server accept failed: " + exc);
            }
             
        });
    }
     
    /**
     * Enqueues a write of the buffer to the channel.
     * The call is asynchronous so the buffer is not safe to modify after
     * passing the buffer here.
     *
     * @param buffer the buffer to send to the channel
     */
    private void writeMessage(final AsynchronousSocketChannel channel, final ByteBuffer buffer) {
        boolean threadShouldWrite = false;
 
        synchronized(queue) {
            queue.add(buffer);
            // Currently no thread writing, make this thread dispatch a write
            if (!writing) {
                writing = true;
                threadShouldWrite = true;
            }
        }
 
        if (threadShouldWrite) {
            writeFromQueue(channel);
        }
    }
 
    private void writeFromQueue(final AsynchronousSocketChannel channel) {
        ByteBuffer buffer;
 
        synchronized (queue) {
            buffer = queue.poll();
            if (buffer == null) {
                writing = false;
            }
        }
 
        // No new data in buffer to write
        if (writing) {
            writeBuffer(channel, buffer);
        }
    }
 
    private void writeBuffer(final AsynchronousSocketChannel channel, ByteBuffer buffer) {
        channel.write(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
            @Override
            public void completed(Integer result, ByteBuffer buffer) {
                if (buffer.hasRemaining()) {
                    channel.write(buffer, buffer, this);
                } else {
                    // Go back and check if there is new data to write
                    writeFromQueue(channel);
                }
            }
 
            @Override
            public void failed(Throwable exc, ByteBuffer attachment) {
                System.out.println("server write failed: " + exc);
            }
        });
    }
    
    /**
     * Sends a message
     * @param string the message
     * @throws CharacterCodingException 
     */
    private void writeStringMessage(final AsynchronousSocketChannel channel, String msg) throws CharacterCodingException {
        writeMessage(channel, Charset.forName("UTF-8").newEncoder().encode(CharBuffer.wrap(msg)));
    }

	public static void main(String[] args) throws IOException {
		SocketServer socketServer = new SocketServer();
		socketServer.listen();
	}

}


SocketClient.java文件内容如下:
package com.shihuan.aio.client;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;

import com.shihuan.aio.util.CharsetHelper;
import com.shihuan.aio.util.Helper;

public class SocketClient implements Runnable {

	private AsynchronousSocketChannel channel;
    private Helper helper;
    private CountDownLatch latch;
    private final Queue<ByteBuffer> queue = new LinkedList<ByteBuffer>();
    private boolean writing = false;
	
	public SocketClient(AsynchronousChannelGroup channelGroup, CountDownLatch latch) throws IOException, InterruptedException {
		this.latch = latch;
        helper = new Helper();
        initChannel(channelGroup);
	}
	
	private void initChannel(AsynchronousChannelGroup channelGroup) throws IOException {
        //在默认channel group下创建一个socket channel
        channel = AsynchronousSocketChannel.open(channelGroup);
        //设置Socket选项
        channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
        channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
        channel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
    }
	
	private void shutdown() throws IOException {
        if(channel != null){
            channel.close();
        }
         
        latch.countDown();                         
    }
     
    /**
     * Enqueues a write of the buffer to the channel.
     * The call is asynchronous so the buffer is not safe to modify after
     * passing the buffer here.
     *
     * @param buffer the buffer to send to the channel
     */
    private void writeMessage(final ByteBuffer buffer) {
        boolean threadShouldWrite = false;
 
        synchronized(queue) {
            queue.add(buffer);
            // Currently no thread writing, make this thread dispatch a write
            if (!writing) {
                writing = true;
                threadShouldWrite = true;
            }
        }
 
        if (threadShouldWrite) {
            writeFromQueue();
        }
    }
 
    private void writeFromQueue() {
        ByteBuffer buffer;
 
        synchronized (queue) {
            buffer = queue.poll();
            if (buffer == null) {
                writing = false;
            }
        }
 
        // No new data in buffer to write
        if (writing) {
            writeBuffer(buffer);
        }
    }
 
    private void writeBuffer(ByteBuffer buffer) {
        channel.write(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
            @Override
            public void completed(Integer result, ByteBuffer buffer) {
                if (buffer.hasRemaining()) {
                    channel.write(buffer, buffer, this);
                } else {
                    // Go back and check if there is new data to write
                    writeFromQueue();
                }
            }
 
            @Override
            public void failed(Throwable exc, ByteBuffer attachment) {
            }
        });
    }
 
    /**
     * Sends a message
     * @param string the message
     * @throws CharacterCodingException 
     */
    public void writeStringMessage(String msg) throws CharacterCodingException {
        writeMessage(Charset.forName("UTF-8").newEncoder().encode(CharBuffer.wrap(msg)));
    }

	@Override
	public void run() {
		System.out.println(Thread.currentThread().getName() + "---run");
        
        //连接服务器
        channel.connect(new InetSocketAddress("localhost", 8383), null, new CompletionHandler<Void, Void>(){
            final ByteBuffer readBuffer = ByteBuffer.allocateDirect(1024);
             
            @Override
            public void completed(Void result, Void attachment) {
                //连接成功后, 异步调用OS向服务器写一条消息
                try {
                    //channel.write(CharsetHelper.encode(CharBuffer.wrap(helper.getWord())));
                    writeStringMessage(helper.getWord());
                } catch (CharacterCodingException e) {
                    e.printStackTrace();
                }
                 
                //helper.sleep();//等待写异步调用完成
                readBuffer.clear();
                //异步调用OS读取服务器发送的消息
                channel.read(readBuffer, null, new CompletionHandler<Integer, Object>(){
 
                    @Override
                    public void completed(Integer result, Object attachment) {
                        try{
                            //异步读取完成后处理
                            if(result > 0){
                                readBuffer.flip();
                                CharBuffer charBuffer = CharsetHelper.decode(readBuffer);
                                String answer = charBuffer.toString(); 
                                System.out.println(Thread.currentThread().getName() + "---" + answer);
                                readBuffer.clear();
                                 
                                String word = helper.getWord();
                                if(word != null){
                                    //异步写
                                    //channel.write(CharsetHelper.encode(CharBuffer.wrap(word)));
                                    writeStringMessage(word);
                                    //helper.sleep();//等待异步操作
                                    channel.read(readBuffer, null, this);
                                }
                                else{
                                    //不想发消息了,主动关闭channel
                                    shutdown();
                                }
                            }
                            else{
                                //对方已经关闭channel,自己被动关闭,避免空循环
                                shutdown();
                            }                                                      
                        }
                        catch(Exception e){
                            e.printStackTrace();
                        }                      
                    }                  
 
                    /**
                     * 读取失败处理
                     * @param exc
                     * @param attachment
                     */
                    @Override
                    public void failed(Throwable exc, Object attachment) {
                        System.out.println("client read failed: " + exc);
                        try {
                            shutdown();
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }
                     
                });
            }
 
            /**
             * 连接失败处理
             * @param exc
             * @param attachment
             */
            @Override
            public void failed(Throwable exc, Void attachment) {
                System.out.println("client connect to server failed: " + exc);
                 
                try {
                    shutdown();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }          
        });
	}
	
	public static void main(String[] args) throws IOException, InterruptedException {
		int sleepTime = Integer.parseInt(args[0]);
        Helper.sleep(sleepTime);
         
        AsynchronousChannelGroup channelGroup = AsynchronousChannelGroup.withFixedThreadPool(Runtime.getRuntime().availableProcessors(), Executors.defaultThreadFactory());
        //只能跑一个线程,第二个线程connect会挂住,暂时不明原因
        final int THREAD_NUM = 1;
        CountDownLatch latch = new CountDownLatch(THREAD_NUM);
         
        //创建个多线程模拟多个客户端,模拟失败,无效
        //只能通过命令行同时运行多个进程来模拟多个客户端
        for(int i=0; i<THREAD_NUM; i++){
        	SocketClient c = new SocketClient(channelGroup, latch);
            Thread t = new Thread(c);
            System.out.println(t.getName() + "---start");
            t.start();
            //让主线程等待子线程处理再退出, 这对于异步调用无效
            //t.join();
        }          
         
        latch.await();
         
        if(channelGroup !=null){
            channelGroup.shutdown();
        }
	}

}


运行步骤:
1、先在eclipse里运行SocketServer.java文件。
2、打开cmd窗口进入E:\BaiduYunDownload\workspaces\javaaio\bin目录。
3、E:\BaiduYunDownload\workspaces\javaaio\bin>java -classpath .\ com.shihuan.aio.client.SocketClient 1


eclipse中的server端控制台截图如下:

  • 大小: 48.9 KB
  • 大小: 38.9 KB
  • 大小: 45.6 KB
分享到:
评论

相关推荐

    Voovan开发手册

    - **JDK1.7兼容**:虽然Voovan项目基于JDK1.8开发,但为了满足部分用户需求,提供了JDK1.7版本的支持。 - **版本选择**:建议使用2016年8月9日之后的master分支或v1.0-alpha-2以上版本以确保兼容性。 - **降级步骤**...

    BIO,NIO,AIO,Netty面试题.pdf

    - JDK 1.7之后开始支持。 #### 6. Netty:高性能网络通信框架 - **定义**:Netty是一个基于NIO的网络通信框架,提供了异步、事件驱动的API,使得开发高性能、高可靠性的网络应用变得简单。 - **特点**: - 大大...

    个人心血吊打面试官 nb

    **八、JDK 1.7与JDK 1.8的区别** - **JDK 1.8**:引入了Lambda表达式和Stream API,极大地简化了函数式编程的使用。 - **HashMap底层改进**:在JDK 1.7中,HashMap采用了数组+链表的存储方式;而在JDK 1.8中,当...

    BIO,NIO,AIO,Netty面试题 35道.pdf

    AIO适用于连接数多且连接时间较长(如相册服务器)的场景,充分利用了操作系统的异步I/O能力,JDK 1.7开始支持。 4. **Netty** Netty是一个基于NIO的高性能、高可靠性的网络通信框架。它提供了事件驱动的网络应用...

    BIO,NIO,AIO,Netty面试题 35道,面试通关宝典

    这种方式使用于连接数目多且连接比较长(重操作)的架构,充分调用操作系统参与并发操作,编程比较复杂,JDK1.7 之后开始支持。 Netty 是由 JBOSS 提供的一个 Java 开源框架。Netty 提供异步的、事件驱动的网络应用...

    浅谈Java中BIO、NIO和AIO的区别和应用场景

    AIO是JDK1.7中引入的真正意义上的异步非阻塞IO模型,服务器的实现模式为多个有效请求一个线程,客户端的IO请求都是由OS先完成再通知服务器应用去启动线程处理(回调)。AIO是Java中最先进的IO模型,它可以极大地提高...

    Linux虚拟机centos7安装oracle weblogic jdk 部署项目等步骤.docx

    使用命令 `rpm -e --nodeps java-1.7.0-openjdk-1.7.0.111-2.6.7.8.el7.x86_64` 等卸载 openJDK。 2. 下载 JDK 安装包:下载 JDK 安装包 jdk-6u10-linux-x64.bin,並将其存储在 /home/jinghaoxin/Downloads 目录下。...

    Java中网络IO的实现方式(BIO、NIO、AIO)介绍

    AIO方式是JDK1.7中提出的。它的工作原理是:采用Proactor模式。AIO的通知是发生在读写等处理之后的回调,有通知时表示相关操作已经结束了。在进行读写操作时,只需要调用相应的read/write方法,并传入...

    java简易版开心农场源码-t-io:t-i

    master分支(默认分支)只支持jdk1.8了,想要兼容jdk1.7的,请前往: 置顶链接 t-io: 不仅仅是百万级TCP长连接框架 t-io是基于jdk aio实现的易学易用、稳定、性能强悍、将多线程运用到极致、内置功能丰富的即时通讯...

    Java三种IO模型原理实例详解

    AIO是在JDK1.7中引入的,基于异步IO思想来完成的。在AIO模式下,应用程序可以直接调用API的read或write方法,操作系统会将可读的流传入read方法的缓冲区,并通知应用程序;对于写操作而言,当操作系统将write方法...

    2018中级java工程师面试总结

    3. **JDK 1.7 新特性**: - 二进制字面量:可以直接用二进制形式(0b/0B开头)定义整数。 - 数字字面量中的下划线:可以在数字中间插入下划线,方便阅读,不改变数值。 - `switch`语句支持字符串:可以基于字符串...

    JAVA IO的3种类型区别解析

    AIO,也称为NIO.2,自JDK 1.7开始提供。它是真正的异步IO,应用程序发起读写操作后,无需等待,可以立即返回继续其他工作。操作系统会在数据准备好时调用预先注册的回调函数,进行实际的读写操作。AIO使得应用程序...

    北科Java新面试宝典.pdf

    1. JDK1.7、1.8的JVM内存模型:包括堆、栈、方法区、程序计数器等区域。 2. 类加载过程和双亲委派模型:类加载过程包括加载、验证、准备、解析和初始化五个阶段,双亲委派模型是指类加载器之间的委派关系。 3. 堆和...

    NC_linux安装was85部署手册.docx

    12. JDK 版本配置:需要配置 JDK 版本,设置为 JAVA1.7,以便于 WAS8.5 的正常运行。 13. 关闭 AIO:在一些场景下,AIO 可能会占用大量的本地内存,因此需要关闭 AIO,以释放内存资源。 这些知识点涵盖了安装 WAS...

    java基础之IO流

    - **工作原理**:AIO是JDK 1.7引入的新特性,它允许应用程序发起异步的读写请求,并通过CompletionHandler来接收完成的通知。 - **优缺点**: - **优点**:进一步提高了系统的并发能力,减少了对线程的依赖。 - ...

    Java面经-百度banbne-副本.pdf

    JDK 1.7和1.8版本的ConcurrentHashMap原理有所区别。主要区别在于1.8版本采用了Node数组+链表+红黑树的数据结构,提高了并发性能。 3. **Hashtable与HashMap的区别**: Hashtable是线程安全的,而HashMap不是。...

Global site tag (gtag.js) - Google Analytics