- 浏览: 216023 次
- 性别:
- 来自: 哈尔滨
文章分类
最新评论
-
lizhenzhendebishe:
提示An error was discovered proce ...
WebService:Axis客户端调用需要身份验证的CXF服务 -
yuanliangding:
学习了。不太接触底层的东西
UNIX系统的IO模型 -
_copythat:
加油,。。。
阳光总会在风雨之后洒向苍茫 -
donlianli:
莫非去淘宝菜鸟网络了?
阳光总会在风雨之后洒向苍茫 -
菜鸟小于:
我也是哈尔滨的,在广州做了三年的开发,可是实际上我们是在维护一 ...
阳光总会在风雨之后洒向苍茫
之前在IBM的网站上看到过一篇介绍NIO的文章,收获很大。但文中的代码只适合短连接的情况,长连接时就不适用了。
最近恰好要写一个处理长连接的服务,接收日志包,然后打包成syslog形式再转发,所以在它的基础上改了一下。
主要改了两个类,一个是Server,因为我们只关注read事件,所以write事件我们暂不处理。另外,在处理完ON_READ事件后,不能执行key.cancel()。
package nioserver; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Set; /** * <p>Title: 主控服务线程,采用NIO实现的长连接服务器</p> * @author sxl * @version 1.0 */ public class Server implements Runnable { private static Selector selector; private ServerSocketChannel sschannel; private InetSocketAddress address; protected Notifier notifier; private int port; /** * 创建主控服务线程 * @param port 服务端口 * @throws java.lang.Exception */ public static int MAX_THREADS = 4; public Server(int port) throws Exception { this.port = port; // 获取事件触发器 notifier = Notifier.getNotifier(); // 创建读写线程池 for (int i = 0; i < MAX_THREADS; i++) { Thread r = new Reader(); Thread w = new Writer(); Thread sys = new Syslog(); r.start(); w.start(); sys.start(); } // 创建无阻塞网络套接 selector = Selector.open(); sschannel = ServerSocketChannel.open(); sschannel.configureBlocking(false); address = new InetSocketAddress(port); ServerSocket ss = sschannel.socket(); ss.bind(address); sschannel.register(selector, SelectionKey.OP_ACCEPT); } public void run() { System.out.println("Server started ..."); System.out.println("Server listening on port: " + port); // 监听 while (true) { try { int num = 0; num = selector.select(); if (num > 0) { Set selectedKeys = selector.selectedKeys(); Iterator it = selectedKeys.iterator(); while (it.hasNext()) { SelectionKey key = (SelectionKey) it.next(); it.remove(); // 处理IO事件 if (key.isAcceptable()) { // Accept the new connection ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); notifier.fireOnAccept(); SocketChannel sc = ssc.accept(); sc.configureBlocking(false); // 触发接受连接事件 Request request = new Request(sc); notifier.fireOnAccepted(request); // 注册读操作,以进行下一步的读操作 sc.register(selector, SelectionKey.OP_READ, request); } else if (key.isReadable()) { Reader.processRequest(key); // 提交读服务线程读取客户端数据 } } } } catch (Exception e) { continue; } } } }
另一个改动的类是Reader,改变对-1的处理,这里不是break,而是抛出异常。在读取完buffer的数据后,将数据包传递给另外两个线程进行syslog的发送和入库操作。
package nioserver; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; import java.util.LinkedList; import java.util.List; /** * <p>Title: 读线程</p> * <p>Description: 该线程用于读取客户端数据</p> * @author sxl * @version 1.0 */ public class Reader extends Thread { private static List pool = new LinkedList(); private static Notifier notifier = Notifier.getNotifier(); public void run() { while (true) { try { SelectionKey key; synchronized (pool) { while (pool.isEmpty()) { pool.wait(); } key = (SelectionKey) pool.remove(0); } // 读取数据 read(key); } catch (Exception e) { continue; } } } /** * 读取客户端发出请求数据 * @param sc 套接通道 */ private static int BUFFER_SIZE = 1024; public static byte[] readRequest(SocketChannel sc) throws IOException { ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE); int off = 0; int r = 0; byte[] data = new byte[BUFFER_SIZE * 10]; while ( true ) { buffer.clear(); r = sc.read(buffer); if(r == 0) break; if(r == -1)//如果是流的末尾,则抛出异常 throw new IOException(); if ((off + r) > data.length) {//数组扩容 data = grow(data, BUFFER_SIZE * 10); } byte[] buf = buffer.array(); System.arraycopy(buf, 0, data, off, r); off += r; } byte[] req = new byte[off]; System.arraycopy(data, 0, req, 0, off); return req; } /** * 处理连接数据读取 * @param key SelectionKey */ public void read(SelectionKey key) { SocketChannel sc = null; try { // 读取客户端数据 sc = (SocketChannel) key.channel(); byte[] clientData = readRequest(sc); if(clientData.length > 0){//有数据才处理 Request request = (Request)key.attachment(); request.setDataInput(clientData); // 提交到数据库写入线程 Writer.processRequest(request); // 提交到Syslog线程,发送syslog Syslog.processRequest(request); } } catch (Exception e) { if(sc != null) try { sc.socket().close(); sc.close(); } catch (IOException e1) { e1.printStackTrace(); } } } /** * 处理客户请求,管理用户的联结池,并唤醒队列中的线程进行处理 */ public static void processRequest(SelectionKey key) { synchronized (pool) { pool.add(pool.size(), key); pool.notifyAll(); } } /** * 数组扩容 * @param src byte[] 源数组数据 * @param size int 扩容的增加量 * @return byte[] 扩容后的数组 */ public static byte[] grow(byte[] src, int size) { byte[] tmp = new byte[src.length + size]; System.arraycopy(src, 0, tmp, 0, src.length); return tmp; } }
评论
5 楼
zk1878
2014-01-14
楼主 这代码完整么? Notifier 这个是啥类,还有Reader 是个线程,然后又有静态方法的,感觉写的很乱呀
4 楼
linginfanta
2013-09-24
有问题,一直OP_ACCEPT...和OP_READ...。CPU百分之几百。
3 楼
Rong_it
2013-05-14
这个里面有个问题就是某个连接持续发送过程中, 某个SelectioneKey在阻塞过程中Reader.processRequest(key); 一直会被触发,而且添加到List pool = new LinkedList(); 中,可以在Reader.processRequest(key); 前面加条日志看看~
2 楼
378629846
2012-09-13
48150084 写道
做过压力测试吗?非常怀疑会死锁。
传的数据前几位最好给出长度,不要做啥数组扩容,很不专业。
不要用wait,notify。LinkedList换成LinkedBlockingQueue。
传的数据前几位最好给出长度,不要做啥数组扩容,很不专业。
不要用wait,notify。LinkedList换成LinkedBlockingQueue。
谢谢!是应该换成LinkedBlockingQueue,代码简洁多了。
1 楼
48150084
2012-09-13
做过压力测试吗?非常怀疑会死锁。
传的数据前几位最好给出长度,不要做啥数组扩容,很不专业。
不要用wait,notify。LinkedList换成LinkedBlockingQueue。
传的数据前几位最好给出长度,不要做啥数组扩容,很不专业。
不要用wait,notify。LinkedList换成LinkedBlockingQueue。
发表评论
-
使用zookeeper实现分布式共享锁
2013-04-13 15:59 2606分布式系统中经常需要协调多进程,多个jvm,或者多台机器之间 ... -
WebService:Axis客户端调用需要身份验证的CXF服务
2012-11-24 12:05 7077CXF服务端代码: 1、web.xml配置 < ... -
使用axis轻松调用Webservice
2012-11-10 13:15 22001使用axis1.4调用webservice有两种简单的方式: ... -
执行java程序时如何引用依赖的jar
2012-11-10 12:38 1793在执行java程序时我们可以通过-Djava.ext.dirs ... -
Log4j简单实用配置
2012-10-27 12:22 3675#A1为控制台输出,A2为文件输出,R为文件输出,并且按 ... -
Int和byte数组之间的转换
2012-08-27 20:25 18823有时候和C的程序通信的时候,我们在封装协议时,可能需要将Jav ... -
用闭锁测试HashMap的并发写入问题
2012-08-27 19:39 3667今天无意中看到以前写 ... -
LDAP查询分页,基于迭代器的查询分页
2012-08-25 17:26 8622LDAP服务器端可以支持分页查询,但是有个前提条件 ... -
并行计算框架的Java实现--系列三
2012-07-14 14:38 4170接上篇并行计算框架的Java实现--系列二 优化锁,之 ... -
并行计算框架的Java实现--系列二
2012-07-14 08:41 2941接上篇并行计算框架的J ... -
并行计算框架的Java实现--系列一
2012-07-14 07:38 3518最近的工作需要统计一些复杂的报表,为了提高效率,想用多线程去实 ... -
ant初探
2012-06-29 13:38 2919前些天和同事交流,他说ant非常好用,他一直在用,学习资料共享 ... -
基于事件的 NIO 多线程服务器
2012-06-27 17:06 1286JDK1.4 的 NIO 有效解决了原有流式 IO 存在的线程 ... -
Java NIO学习
2012-06-22 22:46 0端午节加班要开发一个SocketServer,需要承载1000 ... -
httpclient访问https服务,可以信任证书
2012-06-02 00:48 6858private HttpClient initHttpClie ... -
Java或Web工程中查找配置文件
2012-06-02 00:35 1072String path = ""; URL ... -
如何获取真实的终端IP
2011-11-24 11:16 1224在有Apache做负载均衡的时候使用request.getRe ... -
MANIFEST.MF的应用以及如何读取jar包外的log4j.properties
2011-11-22 18:21 34MANIFEST.MF是jar文件的配置文件,在用eclips ... -
java实现的telnet协议
2011-10-02 17:14 7400package telnet; import j ... -
很好用的java反编译工具
2011-10-02 16:36 1252可以反编译单个文件,也可以反编译整个jar
相关推荐
Java NIO(New IO)是Java 1.4版本引入的一个新特性,它提供了一种新的I/O操作方式,与传统的BIO(Blocking IO)模型相比,NIO具有更高的并发性能,尤其在处理大量连接请求时更为明显。NIO的核心在于非阻塞I/O,即在...
自Java 1.4版本引入NIO后,它为Java开发者提供了更高效的数据传输方式,尤其是在处理大量并发连接时。NIO的核心在于通道(Channels)和缓冲区(Buffers)的概念,与传统的流(Streams)有所不同。 1. **通道...
6. **多路复用器(Multiplexing)**:Java NIO的选择器实现了I/O多路复用,即单个线程可以同时处理多个连接,这在处理大量并发连接时非常有用。 7. **管道(Pipe)**:管道是两个线程间进行单向数据传输的通道。一...
Java NIO(New IO)是一个可以替代标准Java IO API的IO API(从Java 1.4开始),Java...Java NIO引入了选择器的概念,选择器用于监听多个通道的事件(比如:连接打开,数据到达)。因此,单个的线程可以监听多个数据通道。
Java NIO(非阻塞I/O)是一种在Java中实现高效I/O操作的方式,相比于传统的BIO(阻塞I/O),NIO提供了更强大的数据传输能力,尤其适用于高并发、低延迟的网络应用,如服务器长连接场景。在这个主题中,我们将深入...
Java NIO相对于IO,在处理高并发、大数据量的场景下表现出更好的性能。IO模型更适合于顺序读写和少量连接,而NIO则更适合于高并发、短连接的场景。 **NIO技巧与陷阱:** 1. **缓冲区管理**:合理使用缓冲区可以提高...
- **关闭策略**:合理设置超时时间,当连接长时间无数据传输时,可以考虑关闭连接以释放资源。 **4. 应用场景** 短连接适合一次性、低延迟、资源有限的场景,如网页浏览。而长连接适合实时性强、需要持续交互的...
### Java NIO 处理超大数据文件的知识点详解 ...综上所述,使用Java NIO处理超大数据文件时,关键是利用好内存映射文件技术和合理的数据读取策略,通过适当的分块和数据解析方法,可以有效地提升读取速度和处理能力。
而Java NIO引入了选择器(Selector)和通道(Channel)的概念,允许单个线程同时处理多个连接,大大提高了系统在高并发环境下的性能。 本例子中的"NioServer"可能是一个简单的Java NIO服务器端程序,用于演示如何...
传统的Java I/O模型(BIO)在处理大量并发连接时效率较低,因为它基于阻塞模式,一个线程只能处理一个连接,而NIO则允许单个线程同时处理多个连接,大大提高了性能。 `NIOServer.java`和`NIOClient.java`这两个文件...
NIO通过选择器实现多路复用,可以同时处理多个连接,这在高并发服务器中非常有用。例如,一个服务器可以监听多个SocketChannel,而无需为每个客户端连接创建一个新的线程,显著降低了线程创建和销毁的开销。 5. **...
- Java NIO通过非阻塞的方式和选择器,使得处理大量并发连接变得更加高效,尤其适合服务器端的高并发场景。 综上所述,Java NIO是一个强大的I/O库,它为开发者提供了更高效、灵活的I/O操作,特别是在处理高并发、...
// 处理连接请求 SocketChannel clientChannel = serverSocketChannel.accept(); clientChannel.configureBlocking(false); clientChannel.register(selector, SelectionKey.OP_READ); } else if (key.is...
Java NIO(New Input/Output)是Java标准库提供的一种I/O模型,它与传统的 Blocking I/O(BIO)模型不同,NIO提供了非阻塞的读写方式,提高了系统在处理大量并发连接时的效率。在这个“基于java NIO的socket通信demo...
通道是Java NIO中用于I/O操作的连接,它表示到实体的开放连接,例如硬件设备、文件、网络套接字或程序组件。通道可以处于打开或关闭状态,一旦关闭了某个通道,试图对其调用I/O操作就会导致ClosedChannelException被...