`
sunlujing
  • 浏览: 179714 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

NIO socket 的简单连接池

    博客分类:
  • j2se
阅读更多

      在最近的项目中,需要写一个socket 与 底层服务器通信的模块。在设计中,请求对象被封装 xxxRequest,消息返回被封装为 xxxResponse. 由于socket的编程开发经验少,一开始我使用了短连接的方式,每个请求建立一个socket通信,由于每个socket只进行一次读写,这大大浪费了系统资源。

      于是考虑使用长连接,系统公用一个client socket 并对send 操作进行加锁,结果在处理并发的时候,各种慢,各种等待。没有办法,考虑使用两节池,预先创建多个 client socket 放入 连接池,需要发送请求时从连接池获取一个socket,完成请求时放入连接池中。下面是一个简单的实现。

       

        private  static String IP=GlobalNames.industryIP;
 private  static int PORT =Integer.parseInt(GlobalNames.industryPort);
 
 private static  int CONNECTION_POOL_SIZE = 10;
 private static NIOConnectionPool self = null;
 private Hashtable<Integer, SocketChannel> socketPool = null; // 连接池
 private boolean[] socketStatusArray = null; // 连接的状态(true-被占用,false-空闲)
 private static Selector selector  = null;
 private static InetSocketAddress SERVER_ADDRESS = null;
 
 /**
  * 初始化连接池,最大TCP连接的数量为10
  *
  * @throws IOException
  */
 public static synchronized void init() throws Exception {
  self = new NIOConnectionPool();
  self.socketPool = new Hashtable<Integer, SocketChannel>();
  self.socketStatusArray = new boolean[CONNECTION_POOL_SIZE];
  buildConnectionPool();
 }

 /**
  * 建立连接池
  */
 public synchronized static void buildConnectionPool() throws Exception {
  if (self == null) {
   init();
  }
  for (int i = 0; i < CONNECTION_POOL_SIZE; i++) {
   SocketChannel client = allocateSocketChannel();
   self.socketPool.put(new Integer(i), client);
   self.socketStatusArray[i] = false;
  }
 }

 /**
  * 从连接池中获取一个空闲的Socket
  *
  * @return 获取的TCP连接
  */
 public static SocketChannel getConnection() throws Exception {
  if (self == null)
   init();
  int i = 0;
  for (i = 0; i < CONNECTION_POOL_SIZE; i++) {
   if (!self.socketStatusArray[i]) {
    self.socketStatusArray[i] = true;
    break;
   }
  }
  if (i < CONNECTION_POOL_SIZE) {
   return self.socketPool.get(new Integer(i));
   
  } else {

  //目前连接池无可用连接时只是简单的新建一个连接
   SocketChannel newClient = allocateSocketChannel();
   CONNECTION_POOL_SIZE++;
   self.socketPool.put(CONNECTION_POOL_SIZE, newClient);
   return newClient;
  }
 }

 /**
  * 当获得的socket不可用时,重新获得一个空闲的socket。
  *
  * @param socket
  *            不可用的socket
  * @return 新得到的socket
  * @throws Exception
  */
 public static SocketChannel rebuildConnection(SocketChannel socket)
   throws Exception {
  if (self == null) {
   init();
  }
  SocketChannel newSocket = null;
  try {
   for (int i = 0; i < CONNECTION_POOL_SIZE; i++) {
    if (self.socketPool.get(new Integer(i)) == socket) {
     newSocket = allocateSocketChannel();
     self.socketPool.put(new Integer(i), newSocket);
     self.socketStatusArray[i] = true;
    }
   }

  } catch (Exception e) {
   System.out.println("重建连接失败!");
   throw new RuntimeException(e);
  }
  return newSocket;
 }


 /**
  * 将用完的socket放回池中,调整为空闲状态。此时连接并没有断开。
  *
  * @param socket
  *            使用完的socket
  * @throws Exception
  */
 public static void releaseConnection(SocketChannel socket) throws Exception {
  if (self == null) {
   init();
  }
  for (int i = 0; i < CONNECTION_POOL_SIZE; i++) {
   if (self.socketPool.get(new Integer(i)) == socket) {
    self.socketStatusArray[i] = false;
    break;
   }
  }
 }

 /**
  * 断开池中所有连接
  *
  * @throws Exception
  */
 public synchronized static void releaseAllConnection() throws Exception {
  if (self == null)
   init();

  // 关闭所有连接
  SocketChannel socket = null;
  for (int i = 0; i < CONNECTION_POOL_SIZE; i++) {
   socket = self.socketPool.get(new Integer(i));
   try {
    socket.close();
    self.socketStatusArray[i] = false;
   } catch (Exception e) {
    e.printStackTrace();
   }
  }
 }
   
 
 public static SocketChannel allocateSocketChannel(){
  
   SERVER_ADDRESS = new InetSocketAddress(  
                IP, PORT);  
  SocketChannel socketChannel = null;
     SocketChannel client = null;
     try{
     socketChannel = SocketChannel.open();  
        socketChannel.configureBlocking(false);  
        selector = Selector.open();  
        socketChannel.register(selector, SelectionKey.OP_CONNECT);  
        socketChannel.connect(SERVER_ADDRESS);
        Set<SelectionKey> selectionKeys;  
        Iterator<SelectionKey> iterator;  
        SelectionKey selectionKey;
        selector.select();  
        selectionKeys = selector.selectedKeys();  
        iterator = selectionKeys.iterator();  
        while (iterator.hasNext()) {  
            selectionKey = iterator.next();  
            if (selectionKey.isConnectable()) {  
                client = (SocketChannel) selectionKey.channel();  
                if (client.isConnectionPending()) {  
                    client.finishConnect();
                    client.register(selector, SelectionKey.OP_WRITE);  
                    break;
                }
            }
        }
 }catch(Exception e){
  e.printStackTrace();
 }
 return client;
  }

 public static Selector getSelector() {
  return selector;
 }

 

使用连接池进行通信:

 /*缓冲区大小*/ 
     private static int BLOCK = 8*4096;  
     /*发送数据缓冲区*/ 
     private static ByteBuffer sendbuffer = ByteBuffer.allocate(BLOCK);  
   
     /*接受数据缓冲区*/
     private static ByteBuffer protocalNum = ByteBuffer.allocate(4);
     private static ByteBuffer functionNum = ByteBuffer.allocate(4);
     private static ByteBuffer messageLen = ByteBuffer.allocate(4);
     private static ByteBuffer receivebuffer = null;
     private  SocketChannel client = null;
     private Selector selector = null;
    
     private boolean readable = true;
     private boolean writable = true;
    
    
     public NIOSocketBackUp() throws Exception{
      client = NIOConnectionPool.getConnection();
      selector = NIOConnectionPool.getSelector();
     }
    
     public String send(ServiceRequest request) throws Exception {   
             
         Set<SelectionKey> selectionKeys;  
         Iterator<SelectionKey> iterator;  
         SelectionKey selectionKey;  
         int count=0;  
         boolean flag = true;
         String receiveText="";  
          while (flag) {  
             selector.select();  
             //返回此选择器的已选择键集。  
             selectionKeys = selector.selectedKeys();  
             iterator = selectionKeys.iterator();  
             while (iterator.hasNext()) {  
                 selectionKey = iterator.next();  
                 if (selectionKey.isWritable() && (writable)) {  
                         sendbuffer.clear();  
                         sendbuffer.put(request.getProtocalNum());
                         sendbuffer.put(request.getFunctionNum());
                         sendbuffer.put(request.getMessageLen());
                         sendbuffer.put(request.getXmlbytes());
                         sendbuffer.flip();  
                         client.write(sendbuffer);  
                         client.register(selector, SelectionKey.OP_READ);  
                         writable = false;
                 } else if (selectionKey.isReadable() && (readable) ) {  
                     protocalNum.clear();
                     functionNum.clear();
                     messageLen.clear();
                    
                    
                     count=client.read(protocalNum);  
                     count=client.read(functionNum);
                     count=client.read(messageLen);
                     messageLen.rewind();
                     int length = messageLen.asIntBuffer().get(0);
                     receivebuffer = ByteBuffer.allocate(length-12);
                     receivebuffer.clear();
                    
                     //read方式竟然不阻塞
                     int total=0;
                     while(total!=(length-12)){
                       count=client.read(receivebuffer);
                       total+=count;
                     }
                     client.register(selector, SelectionKey.OP_WRITE);  
                     receiveText = new String(receivebuffer.array(),"GBK");
                     flag = false;
                     readable = false;
                     break;
                 }
             }  
         }

      
      NIOConnectionPool.releaseConnection(client);
         return receiveText.trim();
     }  

 

 

 

 

2
1
分享到:
评论
1 楼 zhaoqiubo 2015-08-03  
我理解长连接就是链路要维持着,怎么能一次写入就关闭了呢?你可以再次写入啊,搞不懂为啥要用连接池。望不吝赐教。

相关推荐

    BIO Socket连接池

    在处理大量并发连接时,为了提高性能和资源利用率,通常会引入连接池的概念,这就是"BIO Socket连接池"的核心所在。 BIO( Blocking Input/Output,阻塞I/O)是Java原生I/O模型的一种,它在处理高并发场景时效率较...

    Java实现Socket长连接和短连接

    Socket连接分为两种类型:长连接和短连接。这两种连接方式各有特点,适用于不同的应用场景。 **1. 短连接(Short Connection)** 短连接通常用于一次性、非持久性的通信,如HTTP协议就是典型的短连接。在短连接中...

    socket池,socket

    Socket池和Socket连接池是网络编程中的重要概念,主要用于提高应用程序的性能和效率。在处理大量并发网络连接时,传统的单个Socket连接方式可能会导致系统资源的过度消耗,因此引入了Socket池技术。 Socket,全称是...

    基于 MINA 的 TLS/SSL NIO Socket 实现(二)

    在进行性能优化时,需要注意的是,SSL/TLS的开销会增加CPU和内存使用,因此需要合理配置连接池大小、线程池以及缓冲区大小等参数。MINA允许自定义缓冲区类型,例如使用DirectBuffer来减少内存拷贝,提高效率。 总结...

    一个Socket连接管理器

    1. **连接池**:为了提高性能,管理器通常会使用连接池技术,预先创建并保持一定数量的Socket连接,避免每次客户端请求时都需要新建连接的开销。 2. **并发处理**:管理器需要能同时处理多个客户端的连接请求,这...

    Java网络编程-多线程,连接池

    Java网络编程是构建分布式应用程序的关键技术,特别是在服务器端开发中,多线程和连接池是其核心概念。本文将深入探讨这两个主题,并结合文件传输的实际应用进行讲解。 首先,我们来理解多线程。在Java中,多线程...

    java socket nio 研究

    - 链表或其他数据结构在NIO服务器中的应用,例如维护连接池。 深入理解这些知识点,将有助于你构建自己的高效网络应用程序。同时,参考提供的博客链接,你将能获得更详细的实践指导和理论解析。

    xianchengchi.zip_Socket 线程池_socket池_线程池_线程池socket

    Socket线程池是一种优化策略,用于管理大量的并发Socket连接。本篇将详细探讨Socket线程池的概念、工作原理以及它如何解决多个线程对同一个套接字进行写操作的问题。 首先,我们来理解什么是Socket。Socket是网络上...

    java socket 经典教程

    Java的`java.sql.DriverManager`就提供了连接池的支持,但需要第三方库如Apache Commons Pool来实现Socket连接池。 9. **SSL/TLS安全通信** - Java提供`SSLSocket`和`SSLServerSocket`类支持安全的HTTPS通信,利用...

    Java NIO 国外 PPT 课件(精华)

    NIO可以提高数据库连接池的效率,减少线程等待时间,从而提高整体系统性能。Eran Toch的讲座可能涵盖了使用NIO进行批量数据传输、优化SQL查询等方面。 `session4-extra.ppt`可能是一场技术研讨会的补充材料,详细...

    NIO网络框架 xSocket

    4. **连接管理:**xSocket提供连接池,便于管理客户端与服务器之间的连接,支持连接的创建、关闭以及重用,减少资源消耗。 5. **数据编码解码:**xSocket内置了多种编码解码器,如protobuf、json等,方便进行消息...

    基于socket协议的DDos入侵检测系统

    7. **性能优化**:为了应对大规模的并发连接,系统需要优化资源使用,例如通过设置连接池来复用Socket连接,避免频繁创建和销毁对象。 8. **测试与调试**:模拟DDoS攻击场景进行系统测试,验证其准确性和鲁棒性,...

    java socket多人聊天(文字+图片+文件)

    对于多人聊天,服务端需要维护一个客户端连接池,处理来自多个客户端的消息并广播到其他所有客户端。 3. **客户端实现**:客户端通过`Socket`连接到服务端的指定端口,建立通信链路。每个客户端都有自己的Socket...

    NIO学习总结经典

    通过学习和实践NIO,开发者可以构建高并发、低延迟的服务器应用,如网络服务器、大型数据库连接池等。同时,了解NIO的原理也有助于理解Java的全双工通信机制,如Java NIO.2(即Java 7引入的Channel和Selector的增强...

    Java实现基于TCP协议的Socket通信

    这需要维护一个客户端连接池,并且每个客户端连接都需要开启一个新的线程来处理,以避免阻塞。例如: ```java import java.io.*; import java.net.*; import java.util.*; public class ServerWithClients { ...

    聊天室基础项目资料_socket聊天室_聊天室_java项目_socket_

    在这个项目中,开发者将学习如何利用Java的Socket类来创建客户端和服务器端的连接,实现实时的数据传输,构建一个简单的聊天环境。 【描述】"TCP/IP通信,使用Socket编程,java即时聊天室" 揭示了项目的重点在于...

    Androidsocket通讯demo

    在Android平台上进行网络编程时,Socket通信是一种常见且重要的技术,用于实现客户端和...在实际开发中,可能还需要根据具体需求进行更复杂的设计,例如心跳检测、连接池管理、多路复用等,以提升应用的稳定性和效率。

    Socket TCP应用实例

    9. **性能优化**:在大量并发连接的情况下,可以通过复用已关闭的Socket连接(TCP连接池)、减少不必要的数据拷贝、选择合适的缓冲区大小等方式优化性能。 10. **安全考虑**:在实际应用中,TCP通信可能会涉及敏感...

    rewin.zwgtools.code.jar

    rewin.zwgtools.code....读写xml文档,图片管理,Ftp管理,可以和一个Ftp服务端建立连接实现文件的上传下载,TSF服务提供,两个数据库连接池管理,Socket连接池的管理,等等,皆在为开发人员提供高度集成的开发工具。

    concurrent and nio

    例如,使用NIO的Selector可以在一个线程中管理多个Socket连接,而线程池可以有效地管理这些连接的处理,避免了因大量线程导致的资源浪费。 **源码分析** 博客中提到的“源码”标签可能意味着文章会深入到Java并发...

Global site tag (gtag.js) - Google Analytics