- 浏览: 582978 次
- 性别:
- 来自: 上海
文章分类
- 全部博客 (174)
- JBPM (3)
- WWF (0)
- JavaScript (11)
- J2EE (40)
- OperationSystem (11)
- 数据库 (12)
- CSS (1)
- Ajax (2)
- J2SE (30)
- Tools (10)
- 服务器中间件 (3)
- 异常 (0)
- Flex (5)
- jQuery (11)
- html (9)
- Ejb (1)
- HTML5 Shiv–让该死的IE系列支持HTML5吧 (1)
- Spring (9)
- Quartz (3)
- log4j (1)
- maven (1)
- cpdetector (1)
- JSON (1)
- log4jdbc (1)
- asm (8)
- FusionCharts (1)
- jqplot (1)
- highcharts (1)
- excanvas (1)
- html5 (1)
- jpcap介绍 (1)
- weblogic (3)
- URLURLClassLoader (0)
- URLClassLoader (1)
- ant (2)
- ivy (2)
- nexus (1)
- IT (0)
- LoadRunner (1)
- SCSS (1)
- ruby (1)
- webstorm (1)
- typescript (1)
- Jboss7 (1)
- wildfly (1)
- oracle (5)
- esb (0)
- dubbo (2)
- zookeeper (3)
- eclipse (1)
- Android (2)
- Studio (1)
- Google (1)
- 微信 (1)
- 企业号 (1)
- Linux (13)
- Oracle12c (1)
- Hadoop (1)
- InletexEMC (1)
- Windows (1)
- Netty (3)
- Marshalling (2)
- Protobuf (1)
- gcc (1)
- Git (1)
- GitLab (1)
- shell (2)
- java (3)
- Spring4 (1)
- hibernate4 (1)
- postgresql (1)
- ApacheServer (2)
- Tomcat (2)
- ApacheHttpServer (2)
- realvnc (1)
- redhat (7)
- vncviewer (1)
- LVS (4)
- LVS-DR (1)
- RedHat6.5 (5)
- LVS-NAT (1)
- LVS-IPTUNNEL (2)
- LVS-TUN (1)
- keepalived (2)
- yum (1)
- iso (1)
- VMware (1)
- redhat5 (1)
- ha (1)
- nginx (2)
- proguard (1)
- Mat (1)
- DTFJ (1)
- axis2 (1)
- web service (1)
- centos (1)
- random (1)
- urandom (1)
- apache (1)
- IBM (1)
- cve (1)
- 漏洞 (1)
- JDBC (1)
- DataSource (1)
- jdk (1)
- tuxedo (2)
- wtc (1)
最新评论
-
skying007:
好资料,谢谢分享给啊
FusionCharts在服务器端导出图片(J2EE版) -
cgnnzg:
大神好 可以发一份源码给我学习么 多谢了 978241085 ...
springmvc+dubbo+zookeeper -
jifengjianhao:
求源码:854606899@qq.com
springmvc+dubbo+zookeeper -
wdloyeu:
shihuan8@163.com邮箱网盘在哪,没找到。能给份源 ...
Java Socket长连接示例代码 -
huangshangyuanji:
求代码:45613032@qq.com
springmvc+dubbo+zookeeper
工程结构图:
Helper.java文件内容如下:
CharsetHelper.java文件内容如下:
SocketServer.java文件内容如下:
SocketClient.java文件内容如下:
运行步骤:
1、先在eclipse里运行SocketServer.java文件。
2、打开cmd窗口进入E:\BaiduYunDownload\workspaces\javaaio\bin目录。
3、E:\BaiduYunDownload\workspaces\javaaio\bin>java -classpath .\ com.shihuan.aio.client.SocketClient 1
eclipse中的server端控制台截图如下:
Helper.java文件内容如下:
package com.shihuan.aio.util; import java.util.Random; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; public class Helper { private static BlockingQueue<String> words; private static Random random; public Helper() throws InterruptedException{ words = new ArrayBlockingQueue<String>(5); words.put("hi"); words.put("who"); words.put("what"); words.put("where"); words.put("bye"); random = new Random(); } public String getWord(){ return words.poll(); } public void sleep() { try { TimeUnit.SECONDS.sleep(random.nextInt(3)); } catch (InterruptedException e) { e.printStackTrace(); } } public static void sleep(long l) { try { TimeUnit.SECONDS.sleep(l); } catch (InterruptedException e) { e.printStackTrace(); } } public static String getAnswer(String question){ String answer = null; switch(question){ case "who": answer = "我是小娜\n"; break; case "what": answer = "我是来帮你解闷的\n"; break; case "where": answer = "我来自外太空\n"; break; case "hi": answer = "hello\n"; break; case "bye": answer = "88\n"; break; default: answer = "请输入 who, 或者what, 或者where"; } return answer; } }
CharsetHelper.java文件内容如下:
package com.shihuan.aio.util; import java.nio.ByteBuffer; import java.nio.CharBuffer; import java.nio.charset.CharacterCodingException; import java.nio.charset.Charset; import java.nio.charset.CharsetDecoder; import java.nio.charset.CharsetEncoder; public class CharsetHelper { private static final String UTF_8 = "UTF-8"; private static CharsetEncoder encoder = Charset.forName(UTF_8).newEncoder(); private static CharsetDecoder decoder = Charset.forName(UTF_8).newDecoder(); public static ByteBuffer encode(CharBuffer in) throws CharacterCodingException { return encoder.encode(in); } public static CharBuffer decode(ByteBuffer in) throws CharacterCodingException { return decoder.decode(in); } }
SocketServer.java文件内容如下:
package com.shihuan.aio.server; import java.io.IOException; import java.net.InetSocketAddress; import java.net.StandardSocketOptions; import java.nio.ByteBuffer; import java.nio.CharBuffer; import java.nio.channels.AsynchronousChannelGroup; import java.nio.channels.AsynchronousServerSocketChannel; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.nio.charset.CharacterCodingException; import java.nio.charset.Charset; import java.util.LinkedList; import java.util.Queue; import java.util.concurrent.Executors; import com.shihuan.aio.util.Helper; public class SocketServer { private final AsynchronousServerSocketChannel server; //写队列,因为当前一个异步写调用还没完成之前,调用异步写会抛WritePendingException //所以需要一个写队列来缓存要写入的数据,这是AIO比较坑的地方 private final Queue<ByteBuffer> queue = new LinkedList<ByteBuffer>(); private boolean writing = false; public SocketServer() throws IOException { //设置线程数为CPU核数 AsynchronousChannelGroup channelGroup = AsynchronousChannelGroup.withFixedThreadPool(Runtime.getRuntime().availableProcessors(), Executors.defaultThreadFactory()); server = AsynchronousServerSocketChannel.open(channelGroup); //重用端口 server.setOption(StandardSocketOptions.SO_REUSEADDR, true); //绑定端口并设置连接请求队列长度 server.bind(new InetSocketAddress(8383), 80); } public void listen() { System.out.println(Thread.currentThread().getName() + ": run in listen method" ); //开始接受第一个连接请求 server.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>(){ @Override public void completed(AsynchronousSocketChannel channel, Object attachment) { System.out.println(Thread.currentThread().getName() + ": run in accept completed method" ); //先安排处理下一个连接请求,异步非阻塞调用,所以不用担心挂住了 //这里传入this是个地雷,小心多线程 server.accept(null, this); //处理连接读写 handle(channel); } private void handle(final AsynchronousSocketChannel channel) { System.out.println(Thread.currentThread().getName() + ": run in handle method" ); //每个AsynchronousSocketChannel,分配一个缓冲区 final ByteBuffer readBuffer = ByteBuffer.allocateDirect(1024); readBuffer.clear(); channel.read(readBuffer, null, new CompletionHandler<Integer, Object>(){ @Override public void completed(Integer count, Object attachment) { System.out.println(Thread.currentThread().getName() + ": run in read completed method" ); if(count > 0){ try{ readBuffer.flip(); //CharBuffer charBuffer = CharsetHelper.decode(readBuffer); CharBuffer charBuffer = Charset.forName("UTF-8").newDecoder().decode(readBuffer); String question = charBuffer.toString(); String answer = Helper.getAnswer(question); /*//写入也是异步调用,也可以使用传入CompletionHandler对象的方式来处理写入结果 //channel.write(CharsetHelper.encode(CharBuffer.wrap(answer))); try{ channel.write(Charset.forName("UTF-8").newEncoder().encode(CharBuffer.wrap(answer))); } //Unchecked exception thrown when an attempt is made to write to an asynchronous socket channel and a previous write has not completed. //看来操作系统也不可靠 catch(WritePendingException wpe){ //休息一秒再重试,如果失败就不管了 Helper.sleep(1); channel.write(Charset.forName("UTF-8").newEncoder().encode(CharBuffer.wrap(answer))); }*/ writeStringMessage(channel, answer); readBuffer.clear(); } catch(IOException e){ e.printStackTrace(); } } else{ try { //如果客户端关闭socket,那么服务器也需要关闭,否则浪费CPU channel.close(); } catch (IOException e) { e.printStackTrace(); } } //异步调用OS处理下个读取请求 //这里传入this是个地雷,小心多线程 channel.read(readBuffer, null, this); } /** * 服务器读失败处理 * @param exc * @param attachment */ @Override public void failed(Throwable exc, Object attachment) { System.out.println("server read failed: " + exc); if(channel != null){ try { channel.close(); } catch (IOException e) { e.printStackTrace(); } } } }); } /** * 服务器接受连接失败处理 * @param exc * @param attachment */ @Override public void failed(Throwable exc, Object attachment) { System.out.println("server accept failed: " + exc); } }); } /** * Enqueues a write of the buffer to the channel. * The call is asynchronous so the buffer is not safe to modify after * passing the buffer here. * * @param buffer the buffer to send to the channel */ private void writeMessage(final AsynchronousSocketChannel channel, final ByteBuffer buffer) { boolean threadShouldWrite = false; synchronized(queue) { queue.add(buffer); // Currently no thread writing, make this thread dispatch a write if (!writing) { writing = true; threadShouldWrite = true; } } if (threadShouldWrite) { writeFromQueue(channel); } } private void writeFromQueue(final AsynchronousSocketChannel channel) { ByteBuffer buffer; synchronized (queue) { buffer = queue.poll(); if (buffer == null) { writing = false; } } // No new data in buffer to write if (writing) { writeBuffer(channel, buffer); } } private void writeBuffer(final AsynchronousSocketChannel channel, ByteBuffer buffer) { channel.write(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() { @Override public void completed(Integer result, ByteBuffer buffer) { if (buffer.hasRemaining()) { channel.write(buffer, buffer, this); } else { // Go back and check if there is new data to write writeFromQueue(channel); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { System.out.println("server write failed: " + exc); } }); } /** * Sends a message * @param string the message * @throws CharacterCodingException */ private void writeStringMessage(final AsynchronousSocketChannel channel, String msg) throws CharacterCodingException { writeMessage(channel, Charset.forName("UTF-8").newEncoder().encode(CharBuffer.wrap(msg))); } public static void main(String[] args) throws IOException { SocketServer socketServer = new SocketServer(); socketServer.listen(); } }
SocketClient.java文件内容如下:
package com.shihuan.aio.client; import java.io.IOException; import java.net.InetSocketAddress; import java.net.StandardSocketOptions; import java.nio.ByteBuffer; import java.nio.CharBuffer; import java.nio.channels.AsynchronousChannelGroup; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.nio.charset.CharacterCodingException; import java.nio.charset.Charset; import java.util.LinkedList; import java.util.Queue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; import com.shihuan.aio.util.CharsetHelper; import com.shihuan.aio.util.Helper; public class SocketClient implements Runnable { private AsynchronousSocketChannel channel; private Helper helper; private CountDownLatch latch; private final Queue<ByteBuffer> queue = new LinkedList<ByteBuffer>(); private boolean writing = false; public SocketClient(AsynchronousChannelGroup channelGroup, CountDownLatch latch) throws IOException, InterruptedException { this.latch = latch; helper = new Helper(); initChannel(channelGroup); } private void initChannel(AsynchronousChannelGroup channelGroup) throws IOException { //在默认channel group下创建一个socket channel channel = AsynchronousSocketChannel.open(channelGroup); //设置Socket选项 channel.setOption(StandardSocketOptions.TCP_NODELAY, true); channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true); channel.setOption(StandardSocketOptions.SO_REUSEADDR, true); } private void shutdown() throws IOException { if(channel != null){ channel.close(); } latch.countDown(); } /** * Enqueues a write of the buffer to the channel. * The call is asynchronous so the buffer is not safe to modify after * passing the buffer here. * * @param buffer the buffer to send to the channel */ private void writeMessage(final ByteBuffer buffer) { boolean threadShouldWrite = false; synchronized(queue) { queue.add(buffer); // Currently no thread writing, make this thread dispatch a write if (!writing) { writing = true; threadShouldWrite = true; } } if (threadShouldWrite) { writeFromQueue(); } } private void writeFromQueue() { ByteBuffer buffer; synchronized (queue) { buffer = queue.poll(); if (buffer == null) { writing = false; } } // No new data in buffer to write if (writing) { writeBuffer(buffer); } } private void writeBuffer(ByteBuffer buffer) { channel.write(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() { @Override public void completed(Integer result, ByteBuffer buffer) { if (buffer.hasRemaining()) { channel.write(buffer, buffer, this); } else { // Go back and check if there is new data to write writeFromQueue(); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { } }); } /** * Sends a message * @param string the message * @throws CharacterCodingException */ public void writeStringMessage(String msg) throws CharacterCodingException { writeMessage(Charset.forName("UTF-8").newEncoder().encode(CharBuffer.wrap(msg))); } @Override public void run() { System.out.println(Thread.currentThread().getName() + "---run"); //连接服务器 channel.connect(new InetSocketAddress("localhost", 8383), null, new CompletionHandler<Void, Void>(){ final ByteBuffer readBuffer = ByteBuffer.allocateDirect(1024); @Override public void completed(Void result, Void attachment) { //连接成功后, 异步调用OS向服务器写一条消息 try { //channel.write(CharsetHelper.encode(CharBuffer.wrap(helper.getWord()))); writeStringMessage(helper.getWord()); } catch (CharacterCodingException e) { e.printStackTrace(); } //helper.sleep();//等待写异步调用完成 readBuffer.clear(); //异步调用OS读取服务器发送的消息 channel.read(readBuffer, null, new CompletionHandler<Integer, Object>(){ @Override public void completed(Integer result, Object attachment) { try{ //异步读取完成后处理 if(result > 0){ readBuffer.flip(); CharBuffer charBuffer = CharsetHelper.decode(readBuffer); String answer = charBuffer.toString(); System.out.println(Thread.currentThread().getName() + "---" + answer); readBuffer.clear(); String word = helper.getWord(); if(word != null){ //异步写 //channel.write(CharsetHelper.encode(CharBuffer.wrap(word))); writeStringMessage(word); //helper.sleep();//等待异步操作 channel.read(readBuffer, null, this); } else{ //不想发消息了,主动关闭channel shutdown(); } } else{ //对方已经关闭channel,自己被动关闭,避免空循环 shutdown(); } } catch(Exception e){ e.printStackTrace(); } } /** * 读取失败处理 * @param exc * @param attachment */ @Override public void failed(Throwable exc, Object attachment) { System.out.println("client read failed: " + exc); try { shutdown(); } catch (IOException e) { e.printStackTrace(); } } }); } /** * 连接失败处理 * @param exc * @param attachment */ @Override public void failed(Throwable exc, Void attachment) { System.out.println("client connect to server failed: " + exc); try { shutdown(); } catch (IOException e) { e.printStackTrace(); } } }); } public static void main(String[] args) throws IOException, InterruptedException { int sleepTime = Integer.parseInt(args[0]); Helper.sleep(sleepTime); AsynchronousChannelGroup channelGroup = AsynchronousChannelGroup.withFixedThreadPool(Runtime.getRuntime().availableProcessors(), Executors.defaultThreadFactory()); //只能跑一个线程,第二个线程connect会挂住,暂时不明原因 final int THREAD_NUM = 1; CountDownLatch latch = new CountDownLatch(THREAD_NUM); //创建个多线程模拟多个客户端,模拟失败,无效 //只能通过命令行同时运行多个进程来模拟多个客户端 for(int i=0; i<THREAD_NUM; i++){ SocketClient c = new SocketClient(channelGroup, latch); Thread t = new Thread(c); System.out.println(t.getName() + "---start"); t.start(); //让主线程等待子线程处理再退出, 这对于异步调用无效 //t.join(); } latch.await(); if(channelGroup !=null){ channelGroup.shutdown(); } } }
运行步骤:
1、先在eclipse里运行SocketServer.java文件。
2、打开cmd窗口进入E:\BaiduYunDownload\workspaces\javaaio\bin目录。
3、E:\BaiduYunDownload\workspaces\javaaio\bin>java -classpath .\ com.shihuan.aio.client.SocketClient 1
eclipse中的server端控制台截图如下:
发表评论
-
JDK中的随机数机制探究
2017-05-31 21:25 1955今天有同事问起关于JDK1.8中默认是random问题 ... -
Netty5 AIO
2015-11-25 21:18 1692工程结构图: TimeServer.java文件内容如下: ... -
java JAVA_OPTS javaagent
2015-11-23 15:48 1521附件里java JAVA_OPTS -javaagent的程序 ... -
dubbo源代码调试
2015-09-24 17:06 4213今天同事要调试zookeeper+dubbo源代码,所以做了以 ... -
any+ivy的使用
2014-11-29 16:25 2111javapro-用ivy编译版本.rar文件时示例代码。 i ... -
URLClassLoader初体验
2014-11-26 14:56 2204使用概要: File file = ... -
将输出流OutputStream转化为输入流InputStream的方法
2014-07-19 22:27 27684将输出流OutputStream转化为输入流InputSt ... -
JVM启动参数大全
2014-06-11 17:19 1004http://www.blogjava.net/midstr/ ... -
Java实现aop案例
2014-04-27 11:33 1361MyPersonService.java代码如下: pac ... -
Java Socket长连接示例代码
2014-04-07 13:42 21787SocketListenerPusher.java代码如下: ... -
Java多线程文档
2013-10-06 10:56 1231附件里是Java多线程的pdf文档,写的比较好,可参考。 ... -
利用jackson包进行json字符串与pojo类之间的转换源代码
2012-10-05 19:35 1972附件里是利用jackson包进行json字符串与pojo类之间 ... -
Java Swing开发的文件内容加密
2012-09-02 14:52 1227主要是javax.swing.JFrame和java.awt. ... -
mysql和Oracle在对clob和blob字段的处理
2012-08-25 18:50 30920一、MySQL与Oracle数据库如何处理Clob,Bl ... -
用JAVA Bean 反射得到set,get方法
2011-10-17 19:39 9096/** * java反射bean的get方法 ... -
Oracle自带连接池应用
2011-08-31 13:27 2066依赖ojdbc14.jar包 OraclePooled.ja ... -
利用commons-digester3-30.jar解析xml文件
2011-08-28 14:05 3977此方式解析需要commons-beanutils-1.8.3. ... -
非阻塞的HTTP服务器
2011-08-27 21:23 1352附件里为非阻塞HTTP服务器的相关Java代码。 HttpS ... -
Java.nio
2011-08-27 21:17 1416附件里为阻塞模式、非阻塞模式、阻塞和非阻塞的混合模式代码。 ... -
Java中数字转大写货币(支持到千亿)
2011-08-17 20:20 2843public class MoneyToOther { / ...
相关推荐
- **JDK1.7兼容**:虽然Voovan项目基于JDK1.8开发,但为了满足部分用户需求,提供了JDK1.7版本的支持。 - **版本选择**:建议使用2016年8月9日之后的master分支或v1.0-alpha-2以上版本以确保兼容性。 - **降级步骤**...
- JDK 1.7之后开始支持。 #### 6. Netty:高性能网络通信框架 - **定义**:Netty是一个基于NIO的网络通信框架,提供了异步、事件驱动的API,使得开发高性能、高可靠性的网络应用变得简单。 - **特点**: - 大大...
**八、JDK 1.7与JDK 1.8的区别** - **JDK 1.8**:引入了Lambda表达式和Stream API,极大地简化了函数式编程的使用。 - **HashMap底层改进**:在JDK 1.7中,HashMap采用了数组+链表的存储方式;而在JDK 1.8中,当...
AIO适用于连接数多且连接时间较长(如相册服务器)的场景,充分利用了操作系统的异步I/O能力,JDK 1.7开始支持。 4. **Netty** Netty是一个基于NIO的高性能、高可靠性的网络通信框架。它提供了事件驱动的网络应用...
这种方式使用于连接数目多且连接比较长(重操作)的架构,充分调用操作系统参与并发操作,编程比较复杂,JDK1.7 之后开始支持。 Netty 是由 JBOSS 提供的一个 Java 开源框架。Netty 提供异步的、事件驱动的网络应用...
AIO是JDK1.7中引入的真正意义上的异步非阻塞IO模型,服务器的实现模式为多个有效请求一个线程,客户端的IO请求都是由OS先完成再通知服务器应用去启动线程处理(回调)。AIO是Java中最先进的IO模型,它可以极大地提高...
使用命令 `rpm -e --nodeps java-1.7.0-openjdk-1.7.0.111-2.6.7.8.el7.x86_64` 等卸载 openJDK。 2. 下载 JDK 安装包:下载 JDK 安装包 jdk-6u10-linux-x64.bin,並将其存储在 /home/jinghaoxin/Downloads 目录下。...
AIO方式是JDK1.7中提出的。它的工作原理是:采用Proactor模式。AIO的通知是发生在读写等处理之后的回调,有通知时表示相关操作已经结束了。在进行读写操作时,只需要调用相应的read/write方法,并传入...
master分支(默认分支)只支持jdk1.8了,想要兼容jdk1.7的,请前往: 置顶链接 t-io: 不仅仅是百万级TCP长连接框架 t-io是基于jdk aio实现的易学易用、稳定、性能强悍、将多线程运用到极致、内置功能丰富的即时通讯...
AIO是在JDK1.7中引入的,基于异步IO思想来完成的。在AIO模式下,应用程序可以直接调用API的read或write方法,操作系统会将可读的流传入read方法的缓冲区,并通知应用程序;对于写操作而言,当操作系统将write方法...
3. **JDK 1.7 新特性**: - 二进制字面量:可以直接用二进制形式(0b/0B开头)定义整数。 - 数字字面量中的下划线:可以在数字中间插入下划线,方便阅读,不改变数值。 - `switch`语句支持字符串:可以基于字符串...
AIO,也称为NIO.2,自JDK 1.7开始提供。它是真正的异步IO,应用程序发起读写操作后,无需等待,可以立即返回继续其他工作。操作系统会在数据准备好时调用预先注册的回调函数,进行实际的读写操作。AIO使得应用程序...
1. JDK1.7、1.8的JVM内存模型:包括堆、栈、方法区、程序计数器等区域。 2. 类加载过程和双亲委派模型:类加载过程包括加载、验证、准备、解析和初始化五个阶段,双亲委派模型是指类加载器之间的委派关系。 3. 堆和...
12. JDK 版本配置:需要配置 JDK 版本,设置为 JAVA1.7,以便于 WAS8.5 的正常运行。 13. 关闭 AIO:在一些场景下,AIO 可能会占用大量的本地内存,因此需要关闭 AIO,以释放内存资源。 这些知识点涵盖了安装 WAS...
- **工作原理**:AIO是JDK 1.7引入的新特性,它允许应用程序发起异步的读写请求,并通过CompletionHandler来接收完成的通知。 - **优缺点**: - **优点**:进一步提高了系统的并发能力,减少了对线程的依赖。 - ...
JDK 1.7和1.8版本的ConcurrentHashMap原理有所区别。主要区别在于1.8版本采用了Node数组+链表+红黑树的数据结构,提高了并发性能。 3. **Hashtable与HashMap的区别**: Hashtable是线程安全的,而HashMap不是。...