`
冰封之月
  • 浏览: 18060 次
社区版块
存档分类
最新评论

java socket连接池 实现

阅读更多
公司目前要求做一个能够并发负载很高的webservice,使用了市面流行的框架例如cxf,axis等效果都不理想,即利用jmeter压力测试时,tps浮动较大,分析原因为每次请求webservice接口都会建立一个socket,如果超过最大端口数,那么就要等待原来的socket释放才能新建socket,所以想到了用socket连接池。

即利用socket发送http请求,可以说是实现了http的长连接

本人也是第一次写socket的连接池,所以把代码发出来,希望有共同研究方向的同行能帮我指正不足之处。

1.池成员数据结构
package com.socket.pool;

import java.net.Socket;

public class SocketMember {
    //socket对象
    private Socket socket;
    //是否正被使用
    private boolean inUse=false;
    public Socket getSocket() {
        return socket;
    }
    public void setSocket(Socket socket) {
        this.socket = socket;
    }
    public boolean isInUse() {
        return inUse;
    }
    public void setInUse(boolean inUse) {
        this.inUse = inUse;
    }
    
}



2.socket连接池类
package com.socket.pool;

import java.io.IOException;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;

public class SocketPool {
//host
private String host;
//port
private int port;
//初始化socket数
private int initSize=5;
//最大socket数
private int maxSize=5;
//socket对象容器
private List<SocketMember> socketContainer=new ArrayList<SocketMember>(initSize);

public SocketPool(String host,int port){
    this.host=host;
    this.port=port;
    buildPoolPart();
}
//给socket容器增加成员 一次增加initSize个成员
private List<SocketMember> buildPoolPart(){
    List<SocketMember> poolPart=new ArrayList<SocketMember>(initSize);
    SocketMember member=null;
    for(int i=0;i<initSize;i++){
        Socket socket=null;
        try {
            socket = new Socket(host, port);       
        }
        catch (IOException e) {
            e.printStackTrace();
        }
        member=new SocketMember();
        member.setSocket(socket);
        poolPart.add(member);
    }
    if(poolPart.size()>0){
    socketContainer.addAll(poolPart);
    }else{
        try {
            throw new Exception("扩大池容量失败");
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }
    return poolPart;
}
//获取池中空闲的socket 已做线程安全处理
public SocketMember getMemberFromPool(){
    SocketMember member=null;
    //同步块获取对象
    synchronized (this) {
         for(int i=0;i<socketContainer.size();i++){
             SocketMember memberInPool=socketContainer.get(i);
             boolean inUse=memberInPool.isInUse();
             if(inUse==false){
                 memberInPool.setInUse(true);
                 member=memberInPool;
                 System.out.println("成功获取对象,在池中的位置为:"+i);
                 break;
             }
         }
         //pool中没有空余
         if(member==null){
             if(socketContainer.size()<maxSize){    
                 //扩大池容量
                 List<SocketMember> newPoolPart=buildPoolPart();
                 //从新扩大的部分拿出一个来用
                 member=newPoolPart.get(0);
                 member.setInUse(true);
                 System.out.println("成功扩大池容量,当前size为:"+socketContainer.size());
             }
         }
    }
     //如果超过最大容量 等待 递归
     if(member==null){
         try {
             Thread.sleep(1000);
         }
         catch (InterruptedException e) {
             e.printStackTrace();
         }
         member=getMemberFromPool();
     }
     return member;
     
 }

public int getInitSize() {
    return initSize;
}

public void setInitSize(int initSize) {
    this.initSize = initSize;
}

public int getMaxSize() {
    return maxSize;
}

public void setMaxSize(int maxSize) {
    this.maxSize = maxSize;
}
public List<SocketMember> getSocketContainer() {
    return socketContainer;
}
public String getHost() {
    return host;
}
public int getPort() {
    return port;
}



}


3.守护线程
package com.socket.pool;

import java.io.IOException;
import java.net.Socket;
import java.util.List;
/**
 * 保持池中的socket一直存活 每隔一段时间遍历池检查socket对象 如果已连接那么发送心跳包 如果已断开 那么重新建立连接
  * @ClassName: DaemonThread
  * @Description: TODO
  * @Copyright: Copyright (c) 2016 
  * @Company: 深圳市梦网科技股份有限公司
  * @author hxq
  * @date 2016-8-1 上午10:26:43
  * @version V1.0
 */
public class DaemonThread implements Runnable{
    private SocketPool pool;
    public DaemonThread(SocketPool pool){
        this.pool=pool;
    }
    public void run() {
        List<SocketMember> container=pool.getSocketContainer();
        while(true){
            //10秒检查一次    
            try {
                Thread.sleep(10000);
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
            //遍历检查是否连接 
            for(int i=0;i<container.size();i++){
                SocketMember member=container.get(i);
                Socket socket=member.getSocket();
                //给此socket加锁
                synchronized (socket) {
                    if(!member.isInUse()){
                        
                        if(socket.isConnected()){
                            //如果连接发送心跳包
                            KeepAliveExcutor excutor=new KeepAliveExcutor(member);
                            excutor.request();
                            
                        }else{
                            //如果失败重新建立socket
                            try {
                                socket=new Socket(pool.getHost(), pool.getPort());
                                member.setSocket(socket);
                            }
                            catch (IOException e) {
                                e.printStackTrace();
                            }
                            System.out.println(socket.getLocalPort()+" 断线重连");
                        }
                    }
                }
                //System.out.println(socket.getLocalPort()+" 状态"+socket.isConnected());  
            }
        }
        
    }

}
/**
 * 发送心跳包的executor
  * @ClassName: KeepAliveExcutor
  * @Description: TODO
  * @Copyright: Copyright (c) 2016 
  * @Company: 深圳市梦网科技股份有限公司
  * @author hxq
  * @date 2016-8-16 下午02:16:32
  * @version V1.0
 */
class KeepAliveExcutor extends AbstractSocketExecutor{

    public KeepAliveExcutor(SocketMember socketMember) {
        super(socketMember);
    }
    
    public void request(){ 
        String request=SocketPoolUtil.getNowTimestamp();
        super.request(request, "utf-8");
    }
    
}




5.抽象的业务执行类 因为通过socket能发送http tcp等 所以写了一个抽象类封装公共的方法
package com.socket.pool;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;


public abstract class AbstractSocketExecutor {
    protected SocketMember socketMember=null;
    protected Socket socket=null;
    protected String host;
    protected int port;
    
    public AbstractSocketExecutor(SocketMember socketMember){
        this.socketMember=socketMember;
        if(socketMember!=null){
            this.socket=socketMember.getSocket();
        }
        host=socket.getInetAddress().getHostName();
        port=socket.getPort();
    }
    /**
     * 发送请求
      *
      * @param request
      * @param charset    
      *
      * @Description: TODO
     */
    protected void request(String request,String charset){
        OutputStream out=null;
        try {
            out=socket.getOutputStream();
            out.write(request.getBytes(charset));    
        }
        catch (IOException e) {
            e.printStackTrace();
        }
    }
    /**
     * 返回响应stream
      *
      * @return    
      *
      * @Description: TODO
     */
    protected InputStream getResStream(){
        InputStream in=null;
        try {
            in = socket.getInputStream();
        }
        catch (IOException e) {
            e.printStackTrace();
        }
        return in;
    }
    /**
     * 设置状态为未使用
      *    
      *
      * @Description: TODO
     */
    public void back(){
        //不能关闭流,否则socket会被关闭
/*        try {
            socket.getOutputStream().close();
            socket.getInputStream().close();
        }
        catch (IOException e) {
            e.printStackTrace();
        }*/
        socketMember.setInUse(false);
    }
}



6.抓取html页面的executor
package com.socket.pool;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
/**
 * 发送http请求 抓取html页面内容 写的并不好 尤其是判断结束的方式 勉强能用。。。
  * @ClassName: HTMLSocketExecutor
  * @Description: TODO
  * @Copyright: Copyright (c) 2016 
  * @Company: 深圳市梦网科技股份有限公司
  * @author hxq
  * @date 2016-8-16 下午02:02:47
  * @version V1.0
 */
public class HTMLSocketExecutor extends AbstractSocketExecutor {
    private String charset="UTF-8";
    public HTMLSocketExecutor(SocketMember socketMember) {
        super(socketMember);
    }
    
    public void setRequest(String path){
        StringBuilder requestStr=new StringBuilder();
        requestStr.append("GET ");
        requestStr.append(path);
        requestStr.append(" HTTP/1.1\r\n");
        requestStr.append("User-Agent: Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/50.0.2661.94 Safari/537.36\r\n");
        requestStr.append("Host: ");
        requestStr.append(super.host);
        if(port!=80){
            requestStr.append(":");
            requestStr.append(super.port);
        }
        requestStr.append("\r\n");
        requestStr.append("\r\n");
        super.request(requestStr.toString(), charset);
    }
    
    public String getResponse(){
        StringBuilder responseResult=new StringBuilder();
        BufferedReader socketReader=null;
        try {
            socketReader=new BufferedReader(new InputStreamReader(super.getResStream(), charset)) ;
            String line=null;
            char[] body=null;
            int contentLength=-1;
            boolean isHeadEnd=false;
            while((line=socketReader.readLine())!=null){
                //System.out.println(line);
                responseResult.append(line);
                responseResult.append("\r\n");
                //检查请求头是否结束
                if(!isHeadEnd){
                    isHeadEnd=isBlankLine(line);
                    if(isHeadEnd){
                        //立即判断是否为请求头结束 如果是且有content-length 那么立即一次读取body 结束循环
                        if(contentLength!=-1){
                            if(contentLength==0){
                                break;
                            }
                            //如果content-length>0 那么一次性读出响应体   
                            body=new char[contentLength];
                            int bodyReadEndFlag=socketReader.read(body);
                            responseResult.append(body);
                            if(bodyReadEndFlag==-1){
                                break;
                            }
                            
                        }
                    }
                    if(contentLength==-1){
                        //没有赋值 检查content-length并赋值
                        contentLength=getContentLength(line);
                    }
                }else{
                    //已经读过了head结束行
                    if(isHTMLEnd(line)){
                        //如果碰到</html>
                        break;
                    }
                }

            }
        }
        catch (IOException e) {
            e.printStackTrace();
        }finally{
            super.back();
        }
        
        return responseResult.toString();

    }
    
    private int getContentLength(String line){
        int result=-1;
        if(line.contains("Content-Length")){
            int splitIndex=line.indexOf(":");
            String length=line.substring(splitIndex+1,line.length());
            length=length.trim();
            result=Integer.valueOf(length);
        }
        return result;
    }
    
    private boolean isBlankLine(String line){
        boolean result=false;
        if("".equals(line)){
            result=true;
        }
        return result;
    }
    
    private boolean isHTMLEnd(String line){
        boolean result=false;
        if(line.indexOf("</html>")>-1){
            result=true;
        }
        return result;
    }


}


7.工厂类 用于组装exector对象 实例化executor对象 并将socket对象注入进去 (没有抽象,只能服务HTMLSocketExecutor 。。。 先凑合着用)
package com.socket.pool;


public class SocketExcutorFactory {
/*    private SocketExcutorFactory(){}
    private final static SocketExcutorFactory instance=new SocketExcutorFactory();
    public static SocketExcutorFactory getInstance(){
        return instance;
    }*/
    
    private SocketPool pool=null;
    public SocketExcutorFactory(String host,int port,boolean daemonFlag){
        pool=new SocketPool(host, port);
        if(daemonFlag){
        //守护线程
        Thread daemonThread=new Thread(new DaemonThread(pool));
        daemonThread.start();
        }
    }
    public SocketExcutorFactory(String host,int port){
        this(host, port, true);
    }
    /**
     * 实例化executor 并从池中拿出一个socket注进去
      *
      * @return    
      *
      * @Description: TODO
     */
    public HTMLSocketExecutor getHttpExecutor(){
        HTMLSocketExecutor executor=null;
        executor=new HTMLSocketExecutor(pool.getMemberFromPool());
        return executor;
    }
    

}




单线程测试类
package com.socket.pool;

public class TestPool {
public static void main(String[] args) {
String host="www.cnblogs.com";
int port=80;
SocketExcutorFactory factory=new SocketExcutorFactory(host, port,false);
HTMLSocketExecutor executor=factory.getHttpExecutor();
executor.setRequest("/zhangweizhong/p/5772330.html");
String res=executor.getResponse();
System.out.println(res);
}
}



多线程测试
package com.socket.pool;

public class ExcutorThread implements Runnable {
    HTMLSocketExecutor excutor=null;
    private static SocketExcutorFactory factory=new SocketExcutorFactory("www.baidu.com", 80);
    public ExcutorThread(){
        excutor=factory.getHttpExecutor();
    }
    public void run() {
        excutor.setRequest("/");
        String res=excutor.getResponse();
        if(res.contains("302 Found")){
            System.out.println(Thread.currentThread().getName()+" success");
        }else{
            System.out.println(Thread.currentThread().getName()+" fail");
        }

    }

public static void main(String[] args) {
    Thread thread=null;
    for(int i=0;i<10;i++){
      thread=new Thread(new ExcutorThread());
      thread.start();
  }
}

}


源码:
分享到:
评论

相关推荐

    java socket连接池

    本文将深入探讨Java中的Socket连接池及其实现原理。 首先,理解Socket的基础知识至关重要。Socket是网络编程的基本接口,它提供了进程间通信(IPC)的能力,尤其是在互联网上不同主机间的通信。Java中的Socket类和...

    socket 客户端连接池实现

    在Java中,可以使用Apache Commons Pool库来实现Socket连接池,或者自定义一个基于LinkedList或ConcurrentHashMap的数据结构来管理和维护连接。同时,可以结合JMX(Java Management Extensions)进行监控,查看连接...

    Socket连接池的简单应用

    为此,本文档介绍了一种解决这些问题的方法——Socket连接池技术,并通过具体实例来展示如何在客户端和服务端之间实现这一技术。 #### 二、Socket连接的基础概念 在深入探讨Socket连接池之前,我们需要了解两种...

    Java实现Socket长连接和短连接

    长连接的实现可能涉及到线程池、连接池等技术,以及异常处理和超时策略。 **3. 实现原理** - **Java Socket API**:Java提供了Socket和ServerSocket类来创建和管理TCP连接。通过ServerSocket监听特定端口,等待...

    Socket连接池的经典实例

    一个java socket连接池的典型实例 SocketClient,客户端测试类 SocketAdapter继承Socket类,重新封装socket类 SocketConnectionPool,连接池管理类 StartupSocketServer,socket服务器端的主线程,负责监听端口,当有...

    socket线程连接池实例

    本实例探讨的是如何利用Java中的Socket对象以及线程连接池技术,特别是`GenericObjectPool`来提高程序性能和效率。首先,我们需要理解Socket和线程连接池的基本概念。 **Socket**: Socket是网络通信的一种接口,它...

    如何解决线程太多导致java socket连接池出现的问题

    解决线程太多导致Java Socket连接池出现的问题 线程太多对Socket连接池的影响 在Java应用程序中,线程太多可能会导致Socket连接池出现问题。这是因为每个线程都需要占用一定的系统资源,如内存、CPU、Socket 等。...

    BIO Socket连接池

    `SocketServerPool`可能是一个服务器端的Socket连接池实现,它的主要职责包括: 1. 创建ServerSocket,监听特定端口,用于接收客户端的连接请求。 2. 当有新的客户端连接时,从连接池中取出一个Socket连接,与客户端...

    socket连接池加消息队列源码

    Socket连接池和消息队列是两种在分布式系统和网络编程中常见的技术,它们在提高系统性能、优化资源管理和实现异步通信方面起着至关重要的作用。在这个“socket连接池加消息队列源码”中,我们可以看到作者创建了一个...

    socket池,socket

    Socket池和Socket连接池是网络编程中的重要概念,主要用于提高应用程序的性能和效率。在处理大量并发网络连接时,传统的单个Socket连接方式可能会导致系统资源的过度消耗,因此引入了Socket池技术。 Socket,全称是...

    java socket 经典教程

    Java的`java.sql.DriverManager`就提供了连接池的支持,但需要第三方库如Apache Commons Pool来实现Socket连接池。 9. **SSL/TLS安全通信** - Java提供`SSLSocket`和`SSLServerSocket`类支持安全的HTTPS通信,利用...

    java Socket通信实现.rar

    例如,可能有一个预封装好的`SocketUtil`类,包含了建立Socket连接、发送消息、接收消息等常用方法。这样的设计使得开发者在处理Socket通信时只需调用几个简单的API,而无需关心底层的实现细节。 此外,考虑到文件...

    Java的socket长连接实例

    这个类可能负责管理和维护Socket连接池,它可能包含以下功能: 1. 创建Socket连接:根据服务器地址和端口号创建Socket实例。 2. 连接管理:保存和检索已建立的Socket连接,避免频繁创建和销毁。 3. 连接池维护:...

    java Socket多线程通讯实例

    本实例探讨的是如何利用Java的Socket实现TCP(Transmission Control Protocol)协议下的多线程通讯,允许一个服务端同时处理多个客户端的连接请求。 首先,TCP是一种面向连接的、可靠的、基于字节流的传输层通信...

    数据库连接池BoneCP源码分析报告

    BoneCP是一款高效、轻量级的Java数据库连接池实现,它的源码分析对于我们理解数据库连接池的工作原理,优化数据库性能以及进行二次开发具有重要意义。 首先,我们需要了解数据库连接池的基本概念。数据库连接池是...

    IBM Java socket教程

    在Java中,`java.sql.ConnectionPoolDataSource`和`javax.sql.PooledConnection`接口用于数据库连接池,而对于Socket连接池,开发者可以使用第三方库如Apache Commons Pool或HikariCP。连接池的主要优点是减少资源...

    JAVA socket 通信

    为了实现这些,服务器需要维护一个用户连接池,记录每个Socket连接对应的用户信息,以便正确地路由消息。 在好友列表功能中,用户可以添加、删除或查找好友。这些操作都需要通过Socket通信在客户端和服务器之间传递...

    Java网络编程-多线程,连接池

    在Java中,如Apache Commons DBCP或HikariCP等库提供了数据库连接池的实现。连接池预先初始化一定数量的数据库连接,当应用需要时可以从池中获取,用完后归还,避免了频繁的创建和销毁连接带来的开销。对于网络编程...

    mongo设置连接池

    然而,在MongoDB中,连接池的实现方式有所不同。 在关系型数据库中,连接池通常指的是预先创建多个数据库连接,并将这些连接组织在一起,以便应用程序可以高效地获取和释放连接资源。这种方式能够显著减少连接...

Global site tag (gtag.js) - Google Analytics