论坛首页 Java企业应用论坛

一Reactor模式和NIO的问题

浏览 5934 次
精华帖 (0) :: 良好帖 (0) :: 新手帖 (0) :: 隐藏帖 (0)
作者 正文
   发表时间:2007-08-02  

请教一技术问题,在Reactor模式和NIO中,当客户端建立一个socket连接,然后以毫秒级的时间间隔向server端发数据,如 for(,,){send(data)} .发现Server端只接收到部分数据,大部分数据丢失。主要代码如下:

java 代码
  1. Server.java   
  2. public class Server implements Runnable {
  3. public Server(int port) throws IOException {   
  4.   
  5.       selector = Selector.open();   
  6.       serverSocket = ServerSocketChannel.open();   
  7.       InetSocketAddress address = new InetSocketAddress(InetAddress   
  8.                 .getLocalHost(), port);   
  9.       serverSocket.socket().bind(address);   
  10.   
  11.       serverSocket.configureBlocking(false);   
  12.         //向selector注册该channel   
  13.         SelectionKey sk = serverSocket.register(selector,SelectionKey.OP_ACCEPT);   
  14.         logger.info("-->Start LogServer!");   
  15.         //利用sk的attache功能绑定Acceptor 如果有事情,触发Acceptor   
  16.         sk.attach(new Acceptor(selector, serverSocket));   
  17. }   
  18.   
  19. public void run() { // normally in a new Thread   
  20. //      Runtime.getRuntime().addShutdownHook(new LogServerShutdown());   
  21.     try {   
  22.        while (!this.stop) {   
  23.             selector.select();   
  24.             Set selected = selector.selectedKeys();   
  25.             Iterator it = selected.iterator();   
  26.             //Selector如果发现channel有OP_ACCEPT或READ事件发生,下列遍历就会进行。   
  27.             while (it.hasNext())   
  28.                //来一个事件 第一次触发一个accepter线程   
  29.                //以后触发SocketReadHandler   
  30.                 dispatch((SelectionKey) (it.next()));   
  31.             selected.clear();   
  32.        }   
  33.     } catch (IOException ex) {   
  34.        ex.printStackTrace();   
  35.     }   
  36. }   
  37.   
  38.     //运行Acceptor或SocketReadHandler   
  39. private void dispatch(SelectionKey k) {   
  40.         logger.debug("-->in LogServer.dispatch()!");   
  41.         printKeyInfo(k);   
  42.         Runnable r = (Runnable) (k.attachment());   
  43.         if (r != null) {   
  44.             System.out.println(k);   
  45.             r.run();   
  46.         }   
  47.         logger.debug("-->out LogServer.dispatch()!");   
  48. }
  49. }  
 

 

java 代码
  1. Acceptor.java
  2. public class Acceptor implements Runnable {
  3. public Acceptor(Selector selector, ServerSocketChannel ssc) {   
  4.         logger.debug("-->New Acceptor()!");   
  5.         this.selector = selector;   
  6.         this.ssc = ssc;   
  7. }   
  8.   
  9. public void run() {   
  10.         try {   
  11.                logger.debug("-->In Acceptor.run()!");   
  12.                SocketChannel sc = ssc.accept();   
  13.                logger.info("-->Accepted connection from " + sc);   
  14.                if (sc != null) {   
  15. //  sc.configureBlocking(false);   
  16. //  SelectionKey sk = sc.register(selector, 0);   
  17. //  sk.interestOps(SelectionKey.OP_READ);   
  18. //  selector.wakeup();   
  19. //  sk.attach(new SocketDataHandler(selector, sc));   
  20.        
  21.                       new SocketDataHandler(selector, sc);   
  22.                 }   
  23.           } catch (ClosedChannelException e) {                  e.printStackTrace();   
  24.           } catch (IOException e) {   
  25.                     e.printStackTrace();   
  26.           }   
  27.   
  28. }  
  29. }
java 代码
  1. public class SocketDataHandler implements Runnable {   
  2.   
  3.     private final static QueueFactory queueFactory = QueueFactory.getInstance();   
  4.   
  5.     public SocketDataHandler(Selector sel, SocketChannel c) throws IOException {   
  6.         logger.debug("-->New SocketDataHandler()");   
  7.         socket = c;   
  8.         socket.configureBlocking(false);   
  9.         sk = socket.register(sel, 0);   
  10.         //将SelectionKey绑定为本Handler 下一步有事件触发时,将调用本类的run方法。   
  11.         //参看dispatch(SelectionKey k)   
  12.         sk.attach(this);   
  13.         //同时将SelectionKey标记为可读,以便读取。   
  14.         sk.interestOps(SelectionKey.OP_READ);   
  15.         sel.wakeup();   
  16.     }   
  17.   
  18.     /*  
  19.      * (non-Javadoc)  
  20.      *   
  21.      * @see java.lang.Runnable#run()  
  22.      */  
  23.     public void run() {   
  24.         logger.debug("-->In SocketDataHandler.run()!");   
  25.         try {   
  26.             readRequest();   
  27.             logger.debug("-->out SocketDataHandler.run()!");   
  28.         } catch (IOException e) {   
  29.             sk.cancel();   
  30.             if (socket.isOpen()) {   
  31.                 try {   
  32.                     socket.close();   
  33.                 } catch (IOException e1) {                     
  34.                     e1.printStackTrace();   
  35.                 }   
  36.             }   
  37.             e.printStackTrace();   
  38.         }   
  39.   
  40.     }   
  41.   
  42.     /**  
  43.      * @throws IOException  
  44.      *    
  45.      */  
  46.     private void readRequest() throws IOException {   
  47.         logger.debug("-->In SocketDataHandler.readRequest()!");   
  48.         ByteBuffer input = ByteBuffer.allocate(4500);   
  49.         input.clear();   
  50.         int length = socket.read(input);   
  51.         if (length == -1) {   
  52.             sk.cancel();   
  53.             if (socket.isOpen()) {   
  54.                 try {   
  55.                     socket.close();   
  56.                 } catch (IOException e1) {     
  57.                     e1.printStackTrace();   
  58.                 }   
  59.             }   
  60.             return;   
  61.         }    
  62.         input.flip();   
  63.         LogProcess sp = new LogProcess(socket, input);   
  64.         WorkQueue wq = queueFactory.getQueue();   
  65.         wq.execute(sp);   
  66.         logger.debug("-->Out SocketDataHandler.readRequest()!");   
  67.     }   
  68.   
  69. }  
java 代码
  1. public class Client() {   
  2.   
  3.    public static void main(String[] args) {   
  4.        Socket sc =  SocketConnection.getSocket(remoteAddress);   
  5.        String data = "test";   
  6.        for(int i=0; i<=10; i++){      
  7.            send(data + i);   
  8.        }   
  9.            
  10.        try {   
  11.            sc.close();   
  12.        } catch (IOException e) {   
  13.             e.printStackTrace();   
  14.        }   
  15.    }   
  16. }  
论坛首页 Java企业应用版

跳转论坛:
Global site tag (gtag.js) - Google Analytics