`

基于事件的NIO多线程服务器

 
阅读更多

JDK1.4的NIO有效解决了原有流式IO存在的线程开销的问题,在NIO中使用多线程,主要目的已不是为了应对每个客户端请求而分配独立的服务线程,而是通过多线程充分使用用多个CPU的处理能力和处理中的等待时间,达到提高服务能力的目的。

AD:

JDK1.4的NIO有效解决了原有流式IO存在的线程开销的问题,在NIO中使用多线程,主要目的已不是为了应对每个客户端请求而分配独立的服务线程,而是通过多线程充分使用用多个CPU的处理能力和处理中的等待时间,达到提高服务能力的目的。

线程模型

NIO的选择器采用了多路复用(Multiplexing)技术,可在一个选择器上处理多个套接字,通过获取读写通道来进行IO操作。由于网络带宽 等原因,在通道的读、写操作中是容易出现等待的,所以在读、写操作中引入多线程,对性能提高明显,而且可以提高客户端的感知服务质量。所以本文的模型将主 要通过使用读、写线程池来提高与客户端的数据交换能力。

同时整个服务端的流程处理,建立于事件机制上。在 [接受连接->读->业务处理->写 ->关闭连接 ]这个过程中,触发器将触发相应事件,由事件处理器对相应事件分别响应,完成服务器端的业务处理。
下面我们就来详细看一下这个模型的各个组成部分。

相关事件定义 在这个模型中,我们定义了一些基本的事件:

(1)onAccept:

当服务端收到客户端连接请求时,触发该事件。通过该事件我们可以知道有新的客户端呼入。该事件可用来控制服务端的负载。例如,服务器可设定同时只为一定数量客户端提供服务,当同时请求数超出数量时,可在响应该事件时直接抛出异常,以拒绝新的连接。

(2)onAccepted:

当客户端请求被服务器接受后触发该事件。该事件表明一个新的客户端与服务器正式建立连接。

(3)onRead:

当客户端发来数据,并已被服务器控制线程正确读取时,触发该事件。该事件通知各事件处理器可以对客户端发来的数据进行实际处理了。需要注意的是,在 本模型中,客户端的数据读取是由控制线程交由读线程完成的,事件处理器不需要在该事件中进行专门的读操作,而只需将控制线程传来的数据进行直接处理即可。

(4)onWrite:

当客户端可以开始接受服务端发送数据时触发该事件,通过该事件,我们可以向客户端发送回应数据。在本模型中,事件处理器只需要在该事件中设置 。

(5)onClosed:

当客户端与服务器断开连接时触发该事件。

(6)onError:

当客户端与服务器从连接开始到最后断开连接期间发生错误时触发该事件。通过该事件我们可以知道有什么错误发生。

事件回调机制的实现

在这个模型中,事件采用广播方式,也就是所有在册的事件处理器都能获得事件通知。这样可以将不同性质的业务处理,分别用不同的处理器实现,使每个处理器的业务功能尽可能单一。

如下图:整个事件模型由监听器、事件适配器、事件触发器、事件处理器组成。

(事件模型)

1.监听器(Serverlistener):

这是一个事件接口,定义需监听的服务器事件,如果您需要定义更多的事件,可在这里进行扩展。

  1. public   interface  Serverlistener  
  2. public   void  onError(String error); 
  3. public   void  onAccept()  throws  Exception; 
  4. public   void  onAccepted(Request request)  throws  Exception; 
  5. public   void  onRead(Request request)  throws  Exception; 
  6. public   void  onWrite(Request request, Response response)  throws  Exception; 
  7. public   void  onClosed(Request request)  throws  Exception; 

2. 事件适配器(EventAdapter):

对Serverlistener接口实现一个适配器(EventAdapter),这样的好处是最终的事件处理器可以只处理所关心的事件。

  1. public   abstract   class  EventAdapter  
  2. implements  Serverlistener  
  3. public  EventAdapter() {} 
  4. public   void  onError(String error) {} 
  5. public   void  onAccept()  throws  Exception {} 
  6. public   void  onAccepted(Request request)  throws  Exception {} 
  7. public   void  onRead(Request request)  throws  Exception {} 
  8. public   void  onWrite(Request request, Response response)  throws  Exception {} 
  9. public   void  onClosed(Request request)  throws  Exception {} 

3. 事件触发器(Notifier):

用于在适当的时候通过触发服务器事件,通知在册的事件处理器对事件做出响应。触发器以Singleton模式实现,统一控制整个服务器端的事件,避免造成混乱。

  1. public   class  Notifier  
  2. private   static  Arraylist listeners =  null
  3. private   static  Notifier instance =  null
  4.  
  5. private  Notifier()  
  6. listeners = new  Arraylist(); 
  7.  
  8. /**  
  9. * 获取事件触发器  
  10. * @return 返回事件触发器  
  11. */  
  12. public   static   synchronized  Notifier  
  13. getNotifier()  
  14. if  (instance ==  null )  
  15. instance = new  Notifier(); 
  16. return  instance; 
  17. else   
  18. return  instance; 
  19.  
  20. /**  
  21. * 添加事件监听器  
  22. * @param l 监听器  
  23. */  
  24. public   void  addlistener(Serverlistener l) 
  25. synchronized  (listeners) 
  26. if  (!listeners.contains(l)) 
  27. listeners.add(l); 
  28.  
  29. public   void  fireOnAccept()  
  30. throws  Exception  
  31. for  ( int  i = listeners.size() -  1 ;  
  32. i >= 0 ; i--) 
  33. ( (Serverlistener) listeners. 
  34. get(i)).onAccept(); 
  35. // other fire method  

4. 事件处理器(Handler):

继承事件适配器,对感兴趣的事件进行响应处理,实现业务处理。以下是一个简单的事件处理器实现,它响应onRead事件,在终端打印出从客户端读取的数据。

  1. public   class  ServerHandler  
  2. extends  EventAdapter  
  3. public  ServerHandler() {} 
  4.  
  5. public   void  onRead(Request request)  
  6. throws  Exception  
  7. System.out.println("Received: "  + 
  8. new  String(data)); 

5. 事件处理器的注册。

为了能让事件处理器获得服务线程的事件通知,事件处理器需在触发器中注册。

  1. ServerHandler handler =  new  ServerHandler(); 
  2. Notifier.addlistener(handler); 

实现NIO多线程服务器

NIO多线程服务器主要由主控服务线程、读线程和写线程组成。

1. 主控服务线程(Server):

主控线程将创建读、写线程池,实现监听、接受客户端请求,同时将读、写通道提交由相应的读线程(Reader)和写服务线程(Writer),由读写线程分别完成对客户端数据的读取和对客户端的回应操作。

  1. public   class  Server  implements  Runnable 
  2. private   static  List wpool =  new  LinkedList();  
  3. private   static  Selector selector; 
  4. private  ServerSocketChannel sschannel; 
  5. private  InetSocketAddress address; 
  6. protected  Notifier notifier; 
  7. private   int  port; 
  8. private   static   int  MAX_THREADS =  4
  9.  
  10. /**  
  11. * Creat the main thread  
  12. * @param port server port  
  13. * @throws java.lang.Exception  
  14. */  
  15. public  Server( int  port)  throws  Exception 
  16. this .port = port; 
  17.  
  18. // event dispatcher  
  19. notifier = Notifier.getNotifier(); 
  20.  
  21. // create the thread pool for reading and writing  
  22. for  ( int  i =  0 ; i < MAX_THREADS; i++) 
  23. Thread r = new  Reader(); 
  24. Thread w = new  Writer(); 
  25. r.start(); 
  26. w.start(); 
  27.  
  28. // create nonblocking socket  
  29. selector = Selector.open(); 
  30. sschannel = ServerSocketChannel.open(); 
  31. sschannel.configureBlocking(false ); 
  32. address = new  InetSocketAddress(port); 
  33. ServerSocket ss = sschannel.socket(); 
  34. ss.bind(address); 
  35. sschannel.register(selector, SelectionKey.OP_ACCEPT); 
  36.  
  37. public   void  run() 
  38. System.out.println("Server started " ); 
  39. System.out.println("Server listening on port: "  + port); 
  40.  
  41. while  ( true
  42. try  
  43. int  num =  0
  44. num = selector.select(); 
  45.  
  46. if  (num >  0
  47. Set selectedKeys = selector.selectedKeys(); 
  48. Iterator it = selectedKeys.iterator(); 
  49. while  (it.hasNext()) 
  50. SelectionKey key = (SelectionKey) it.next(); 
  51. it.remove(); 
  52.  
  53. if  ((key.readyOps() & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT) 
  54. // Accept the new connection  
  55. ServerSocketChannel ssc =  
  56. (ServerSocketChannel) key.channel(); 
  57. notifier.fireOnAccept(); 
  58.  
  59. SocketChannel sc = ssc.accept(); 
  60. sc.configureBlocking(false ); 
  61.  
  62. Request request = new  Request(sc); 
  63. notifier.fireOnAccepted(request); 
  64.  
  65. sc.register(selector, SelectionKey.OP_READ,request); 
  66. }  
  67. else   if  ((key.readyOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ) 
  68. Reader.processRequest(key);  
  69. key.cancel(); 
  70. }  
  71. else   if  ((key.readyOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE) 
  72. Writer.processRequest(key); 
  73. key.cancel(); 
  74. }  
  75. //this selector's wakeup method is invoked  
  76. else  
  77. //register new channel for writing to selector  
  78. addRegister();  
  79. }  
  80. catch  (Exception e) 
  81. notifier.fireOnError("Error occured in Server: "  
  82. + e.getMessage()); 
  83. continue
  84.  
  85. private   void  addRegister() 
  86. synchronized  (wpool) 
  87. while  (!wpool.isEmpty()) 
  88. SelectionKey key = (SelectionKey) wpool.remove(0 ); 
  89. SocketChannel schannel = (SocketChannel) key.channel(); 
  90. try  
  91. schannel.register(selector, SelectionKey.OP_WRITE, key 
  92. .attachment()); 
  93. catch  (Exception e) 
  94. try  
  95. schannel.finishConnect(); 
  96. schannel.close(); 
  97. schannel.socket().close(); 
  98. notifier.fireOnClosed((Request) key.attachment()); 
  99. }  
  100. catch  (Exception e1) 
  101. notifier.fireOnError("Error occured in addRegister: "  
  102. + e.getMessage()); 
  103.  
  104. public   static   void  processWriteRequest(SelectionKey key) 
  105. synchronized  (wpool) 
  106. wpool.add(wpool.size(), key); 
  107. wpool.notifyAll(); 
  108. selector.wakeup();  

2. 读线程(Reader):

使用线程池技术,通过多个线程读取客户端数据,以充分利用网络数据传输的时间,提高读取效率。

  1. public   class  Reader  extends  Thread  
  2. public   void  run()  
  3. while  ( true )  
  4. try   
  5. SelectionKey key; 
  6. synchronized  (pool)  
  7. while  (pool.isEmpty())  
  8. pool.wait(); 
  9. key = (SelectionKey) pool.remove(0 ); 
  10. // 读取客户端数据,并触发onRead事件  
  11. read(key);  
  12. catch  (Exception e)  
  13. continue

3. 写线程(Writer):

和读操作一样,使用线程池,负责将服务器端的数据发送回客户端。

  1. public   final   class  Writer  extends  Thread  
  2. public   void  run()  
  3. while  ( true )  
  4. try   
  5. SelectionKey key; 
  6. synchronized  (pool)  
  7. while  (pool.isEmpty())  
  8. pool.wait(); 
  9. key = (SelectionKey) pool.remove(0 ); 
  10.  
  11. write(key);  
  12. catch  (Exception e)  
  13. continue

具体应用

NIO多线程模型的实现告一段落,现在我们可以暂且将NIO的各个API和烦琐的调用方法抛于脑后,专心于我们的实际应用中。

我们用一个简单的TimeServer(时间查询服务器)来看看该模型能带来多么简洁的开发方式。

在这个TimeServer中,将提供两种语言(中文、英文)的时间查询服务。我们将读取客户端的查询命令(GB/EN),并回应相应语言格式的当 前时间。在应答客户的请求的同时,服务器将进行日志记录。做为示例,对日志记录,我们只是简单地将客户端的访问时间和IP地址输出到服务器的终端上。

1. 实现时间查询服务的事件处理器(TimeHandler):

  1. public   class  TimeHandler  extends  EventAdapter  
  2. public  TimeHandler() {} 
  3.  
  4. public   void  onWrite(Request request, Response response)  throws  Exception  
  5. String command = new  String(request.getDataInput()); 
  6. String time = null
  7. Date date = new  Date(); 
  8.  
  9. if  (command.equals( "GB" ))  
  10. DateFormat cnDate = DateFormat.getDateTimeInstance(DateFormat.FulL, 
  11. DateFormat.FulL, Locale.CHINA); 
  12. time = cnDate.format(date); 
  13. else   
  14. DateFormat enDate = DateFormat.getDateTimeInstance(DateFormat.FulL, 
  15. DateFormat.FulL, Locale.US); 
  16. time = enDate.format(date); 
  17.  
  18. response.send(time.getBytes()); 

2. 实现日志记录服务的事件处理器(LogHandler):

  1. public   class  LogHandler  extends  EventAdapter  
  2. public  LogHandler() {} 
  3.  
  4. public   void  onClosed(Request request)  
  5. throws  Exception  
  6. String log = new  Date().toString() +  " from "  + request.getAddress().toString(); 
  7. System.out.println(log); 
  8.  
  9. public   void  onError(String error)  
  10. System.out.println("Error: "  + error); 

3. 启动程序:

  1. public   class  Start  
  2.  
  3. public   static   void  main(String[] args)  
  4. try   
  5. LogHandler loger = new  LogHandler(); 
  6. TimeHandler timer = new  TimeHandler(); 
  7. Notifier notifier = Notifier.getNotifier(); 
  8. notifier.addlistener(loger); 
  9. notifier.addlistener(timer); 
  10.  
  11. System.out.println("Server starting " ); 
  12. Server server = new  Server( 5100 ); 
  13. Thread tServer = new  Thread(server); 
  14. tServer.start(); 
  15. catch  (Exception e)  
  16. System.out.println("Server error: "  + e.getMessage()); 
  17. System.exit(-1 ); 

小  结

通过例子我们可以看到,基于事件回调的NIO多线程服务器模型,提供了清晰直观的实现方式,可让开发者从NIO及多线程的技术细节中摆脱出来,集中精力关注具体的业务实现。

原文链接:http://www.cnblogs.com/longb/archive/2006/04/04/366800.html

分享到:
评论

相关推荐

    基于事件的 NIO 多线程服务器

    基于事件的 NIO 多线程服务器

    基于事件的_NIO_多线程服务器

    ### 基于事件的NIO多线程服务器解析 #### 概述 在Java的网络编程中,NIO(Non-blocking I/O)作为一种高效的数据处理模式,自JDK 1.4版本引入以来,逐渐成为了开发高性能网络应用的重要工具之一。与传统的阻塞I/O...

    基于时间的NIO多线程服务器

    ### 基于时间的NIO多线程服务器——深入解析与关键技术点 #### 引言 在服务器端编程领域,随着互联网应用的不断发展,如何高效处理大量的并发连接成为了一个重要议题。Java NIO(非阻塞I/O)作为一种先进的I/O处理...

    基于事件的NIO多线程服务器打包

    该包封装过的NIO比sun本身的更容易处理 server中只有区区几行就搞定了: //创建listener TimeHandler timer = new TimeHandler(); //获取Notifier Notifier notifier = Notifier.getNotifier(); //注册监听 notifier....

    使用多线程的NIO构建简易的多线程java服务器

    构建一个基于NIO的多线程服务器,主要步骤如下: 1. **创建ServerSocketChannel**: 首先,我们需要创建一个`ServerSocketChannel`,它是服务器端接收客户端连接的入口。通过`ServerSocketChannel.open()`方法初始...

    Nio多线程CS收发信息问题(问题已经解决)

    总结来说,"Nio多线程CS收发信息问题"涉及到在Java NIO环境下,如何构建一个多线程、高并发的客户端-服务器通信系统,并解决在这个过程中可能遇到的各种挑战。通过理解NIO的核心概念,合理设计线程管理和异常处理...

    java多线程nio服务器

    Java NIO服务器的多线程设计有助于提高服务器的并发性能,特别是在高并发场景下,可以有效地利用系统资源,避免大量线程导致的内存消耗和上下文切换开销。同时,通过选择器的使用,减少了对主线程的占用,使得服务器...

    Java实现基于NIO的多线程Web服务器实例

    Java实现基于NIO的多线程Web服务器实例 Java实现基于NIO的多线程Web服务器实例是使用Java语言基于NIO(New I/O)技术实现的多线程Web服务器。NIO技术可以提供高性能、低延迟的I/O操作,非常适合高并发的Web服务器。...

    基于HTTP、NIO、多线程实现浏览器高并发非阻塞访问服务器文件

    实现功能:基于HTTP协议,解析请求和拼接响应,基于NIO的非阻塞,线程池,文件传输。代码有详细注释和清晰的框架。 程序入口是: /HttpServerReactor/src/com/StartServer.Java 访问1,浏览:...

    基于多线程的web服务器java源码

    【标题】基于多线程的Web服务器Java源码解析 在Web开发领域,服务器是至关重要的组成部分,它们负责处理客户端的HTTP请求并返回相应的HTTP响应。本篇文章将深入探讨一个基于多线程的Web服务器Java源码,帮助你理解...

    基于Java NIO的网络服务器Netty生产实例.zip

    3. **Netty的事件驱动模型**:Netty采用 reactor 模式,通过EventLoopGroup来管理事件循环线程,每个线程负责处理多个连接的事件。当有新连接、读写事件发生时,会触发相应的ChannelHandler进行处理。 4. **Netty的...

    基于多线程的web服务器

    6. **性能优化**:为了最大化效率,多线程服务器可能采用各种策略,如使用非阻塞I/O(NIO)以减少线程等待时间,或者利用异步事件驱动模型(如Reactor模式)来减少线程上下文切换的开销。 然而,多线程模型并非没有...

    多线程web服务器 附实验报告 java

    在IT领域,尤其是在服务器开发中,多线程技术扮演着至关重要的角色,特别是在构建高性能的Web服务器时。本文将深入探讨多线程Web服务器的概念、Java中的Socket编程以及如何通过实现Runnable接口来创建多线程。 多...

    一个基于java nio的简单的http服务器.zip

    1. **Selector(选择器)**:选择器允许单个线程检查多个通道(Channels)上的事件,如连接建立、数据到达等。通过注册通道到选择器,并设置感兴趣的事件类型,我们可以避免创建大量线程来处理每个连接,从而提高...

    基于nio实现的多文件上传源码

    本主题“基于nio实现的多文件上传源码”探讨的是如何利用Java NIO来实现高效的多文件上传功能,尤其对于小文件数量较大的情况。 首先,理解NIO的基本概念是必要的。NIO中的“非阻塞”意味着当数据不可用时,读写...

    基于java的多线程端口扫描软件(源码)

    本项目是基于Java实现的多线程端口扫描软件,旨在利用多线程技术来提高端口扫描的速度和效率。端口扫描是网络安全领域中的一个关键操作,用于发现网络上开放的服务和可能的安全漏洞。 1. **Java多线程** 在Java中...

    多线程精品资源--Java NIO+多线程实现聊天室.zip

    在这个“多线程精品资源--Java NIO+多线程实现聊天室”的压缩包中,我们可以推测它包含了一套关于如何使用Java NIO和多线程技术来创建一个实时聊天应用的教程或示例代码。 首先,多线程是Java中并行处理的基础。...

Global site tag (gtag.js) - Google Analytics