`
Donald_Draper
  • 浏览: 984080 次
社区版块
存档分类
最新评论

Mysql负载均衡连接的获取

    博客分类:
  • JDBC
阅读更多
Java动态代理:http://www.cnblogs.com/xiaoluo501395377/p/3383130.html
http://www.360doc.com/content/14/0801/14/1073512_398598312.shtml
JDBC驱动初始化-Mysql:http://donald-draper.iteye.com/blog/2342010
JDBC连接的获取:http://donald-draper.iteye.com/blog/2342011
前面我们讲过单机Server数据库连接的获取,今天来说一下,负载均衡集群下,连接的获取
url为jdbc:mysql:loadbalance://的数据库连接获取方法
if(StringUtils.startsWithIgnoreCase(url, "jdbc:mysql:loadbalance://"))
       return connectLoadBalanced(url, info);

//NonRegisteringDriver
//负载均衡连接获取方法
private Connection connectLoadBalanced(String url, Properties info)
        throws SQLException
    {
        //解析url
        Properties parsedProps = parseURL(url, info);
        parsedProps.remove("roundRobinLoadBalance");
        if(parsedProps == null)
            return null;
        String hostValues = parsedProps.getProperty("HOST");
        List hostList = null;
        if(hostValues != null)
            hostList = StringUtils.split(hostValues, ",", true);
        if(hostList == null)
        {
            hostList = new ArrayList();
            hostList.add("localhost:3306");
        }
	//构造负载均衡连接代理
        LoadBalancingConnectionProxy proxyBal = new LoadBalancingConnectionProxy(hostList, parsedProps);
	//通过代理新建代理实例Connection
        return (Connection)Proxy.newProxyInstance(getClass().getClassLoader(), new Class[] {
            java.sql.Connection.class
        }, proxyBal);
    }

//LoadBalancingConnectionProxy
public class LoadBalancingConnectionProxy
    implements InvocationHandler, PingTarget
{
    private static Method getLocalTimeMethod;
    public static final String BLACKLIST_TIMEOUT_PROPERTY_KEY = "loadBalanceBlacklistTimeout";
    private Connection currentConn;
    private List hostList;
    private Map liveConnections;
    private Map connectionsToHostsMap;
    private long responseTimes[];
    private Map hostsToListIndexMap;
    private boolean inTransaction;
    private long transactionStartTime;
    private Properties localProps;
    private boolean isClosed;
    private BalanceStrategy balancer;//负载均衡策略
    private int retriesAllDown;
    private static Map globalBlacklist = new HashMap();
    private int globalBlacklistTimeout;

    static 
    {
        try
        {
            getLocalTimeMethod = (java.lang.System.class).getMethod("nanoTime", new Class[0]);
        }
        catch(SecurityException e) { }
        catch(NoSuchMethodException e) { }
    }
    LoadBalancingConnectionProxy(List hosts, Properties props)
        throws SQLException
    {
        inTransaction = false;
        transactionStartTime = 0L;
        isClosed = false;
        globalBlacklistTimeout = 0;
        hostList = hosts;
        int numHosts = hostList.size();
	//存活连接
        liveConnections = new HashMap(numHosts);
        connectionsToHostsMap = new HashMap(numHosts);
	//Host连接的相应时间
        responseTimes = new long[numHosts];
        hostsToListIndexMap = new HashMap(numHosts);
        for(int i = 0; i < numHosts; i++)
            hostsToListIndexMap.put(hostList.get(i), new Integer(i));

        localProps = (Properties)props.clone();
        localProps.remove("HOST");
        localProps.remove("PORT");
        localProps.setProperty("useLocalSessionState", "true");
        String strategy = localProps.getProperty("loadBalanceStrategy", "random");
        String retriesAllDownAsString = localProps.getProperty("retriesAllDown", "120");
        try
        {
            retriesAllDown = Integer.parseInt(retriesAllDownAsString);
        }
        catch(NumberFormatException nfe)
        {
            throw SQLError.createSQLException(Messages.getString("LoadBalancingConnectionProxy.badValueForRetriesAllDown", new Object[] {
                retriesAllDownAsString
            }), "S1009", null);
        }
        String blacklistTimeoutAsString = localProps.getProperty("loadBalanceBlacklistTimeout", "0");
        try
        {
            globalBlacklistTimeout = Integer.parseInt(blacklistTimeoutAsString);
        }
        catch(NumberFormatException nfe)
        {
            throw SQLError.createSQLException(Messages.getString("LoadBalancingConnectionProxy.badValueForLoadBalanceBlacklistTimeout", new Object[] {
                retriesAllDownAsString
            }), "S1009", null);
        }
	//构建负载均衡策略
        if("random".equals(strategy))
            balancer = (BalanceStrategy)Util.loadExtensions(null, props, "com.mysql.jdbc.RandomBalanceStrategy", "InvalidLoadBalanceStrategy", null).get(0);
        else
        if("bestResponseTime".equals(strategy))
            balancer = (BalanceStrategy)Util.loadExtensions(null, props, "com.mysql.jdbc.BestResponseTimeBalanceStrategy", "InvalidLoadBalanceStrategy", null).get(0);
        else
            balancer = (BalanceStrategy)Util.loadExtensions(null, props, strategy, "InvalidLoadBalanceStrategy", null).get(0);
        //初始化负载均衡器
	balancer.init(null, props);
	//从负载均衡器获取连接
        pickNewConnection();
    }
}

下面分三步来看LoadBalancingConnectionProxy的构建
//构建负载均衡策略
//Util
public static List loadExtensions(Connection conn, Properties props, String extensionClassNames, String errorMessageKey, ExceptionInterceptor exceptionInterceptor)
        throws SQLException
    {
        List extensionList = new LinkedList();
        List interceptorsToCreate = StringUtils.split(extensionClassNames, ",", true);
        Iterator iter = interceptorsToCreate.iterator();
        String className = null;
        try
        {
            Extension extensionInstance;
            for(; iter.hasNext(); extensionList.add(extensionInstance))
            {
                className = iter.next().toString();
		//加载className
                extensionInstance = (Extension)Class.forName(className).newInstance();
		//初始化class
                extensionInstance.init(conn, props);
            }

        }
        catch(Throwable t)
        {
            SQLException sqlEx = SQLError.createSQLException(Messages.getString(errorMessageKey, new Object[] {
                className
            }), exceptionInterceptor);
            sqlEx.initCause(t);
            throw sqlEx;
        }
        return extensionList;
    }

再看负载均衡器的初始化,这里我们以BestResponseTimeBalanceStrategy为例:
public class BestResponseTimeBalanceStrategy
    implements BalanceStrategy
{
   public void init(Connection connection, Properties properties)
        throws SQLException
    {
      //初始化为空,待扩展
    }
}


回到LoadBalancingConnectionProxy的构造方法,从负载均衡器获取连接
p
rivate synchronized void pickNewConnection()
        throws SQLException
    {
        if(currentConn == null)
        {
            currentConn = balancer.pickConnection(this, Collections.unmodifiableList(hostList), Collections.unmodifiableMap(liveConnections), (long[])responseTimes.clone(), retriesAllDown);
            return;
        } else
        {
            Connection newConn = balancer.pickConnection(this, Collections.unmodifiableList(hostList), Collections.unmodifiableMap(liveConnections), (long[])responseTimes.clone(), retriesAllDown);
            newConn.setTransactionIsolation(currentConn.getTransactionIsolation());
            newConn.setAutoCommit(currentConn.getAutoCommit());
            currentConn = newConn;
            return;
        }
    }

查看BestResponseTimeBalanceStrategy的pickConnection方法
//BestResponseTimeBalanceStrategy
 
public Connection pickConnection(LoadBalancingConnectionProxy proxy, List configuredHosts, Map liveConnections, long responseTimes[], int numRetries)
        throws SQLException
    {
        SQLException ex;
label0:
        {
            Map blackList = proxy.getGlobalBlacklist();
            ex = null;
            int attempts = 0;
            Connection conn;
            do
            {
                if(attempts >= numRetries)
                    break label0;
                long minResponseTime = 9223372036854775807L;
                int bestHostIndex = 0;
		//获取代理host黑名单
                if(blackList.size() == configuredHosts.size())
                    blackList = proxy.getGlobalBlacklist();
		//从responseTimes筛选出相应时间最小的host索引index
                for(int i = 0; i < responseTimes.length; i++)
                {
                    long candidateResponseTime = responseTimes[i];
                    if(candidateResponseTime >= minResponseTime || blackList.containsKey(configuredHosts.get(i)))
                        continue;
                    if(candidateResponseTime == 0L)
                    {
                        bestHostIndex = i;
                        break;
                    }
                    bestHostIndex = i;
                    minResponseTime = candidateResponseTime;
                }
                //从configuredHosts获取host
                String bestHost = (String)configuredHosts.get(bestHostIndex);
		//从liveConnections获取连接
                conn = (Connection)liveConnections.get(bestHost);
                if(conn != null)
                    break;
                try
                {
		    //如果liveConnections不存在host对应的连接,则通过代理去创建一个连接
                    conn = proxy.createConnectionForHost(bestHost);
                    break;
                }
                catch(SQLException sqlEx)
                {
                    ex = sqlEx;
                    if((sqlEx instanceof CommunicationsException) || "08S01".equals(sqlEx.getSQLState()))
                    {
		        //如果创建连接异常,则加入黑名单
                        proxy.addToGlobalBlacklist(bestHost);
                        blackList.put(bestHost, null);
                        if(blackList.size() == configuredHosts.size())
                        {
                            attempts++;
                            try
                            {
                                Thread.sleep(250L);
                            }
                            catch(InterruptedException e) { }
                            blackList = proxy.getGlobalBlacklist();
                        }
                    } else
                    {
                        throw sqlEx;
                    }
                }
            } while(true);
            return conn;
        }
        if(ex != null)
            throw ex;
        else
            return null;
    }

来看LoadBalancingConnectionProxy创建连接:
public synchronized Connection createConnectionForHost(String hostPortSpec)
        throws SQLException
    {
        Properties connProps = (Properties)localProps.clone();
        String hostPortPair[] = NonRegisteringDriver.parseHostPortPair(hostPortSpec);
        if(hostPortPair[1] == null)
            hostPortPair[1] = "3306";
        connProps.setProperty("HOST", hostPortSpec);
        connProps.setProperty("PORT", hostPortPair[1]);
	//返回的实际为ConnectionImpl
        Connection conn = ConnectionImpl.getInstance(hostPortSpec, Integer.parseInt(hostPortPair[1]), connProps, connProps.getProperty("DBNAME"), "jdbc:mysql://" + hostPortPair[0] + ":" + hostPortPair[1] + "/");
        liveConnections.put(hostPortSpec, conn);
        connectionsToHostsMap.put(conn, hostPortSpec);
        return conn;
    }

在回到connectLoadBalanced函数:
//通过代理新建代理实例Connection
return (Connection)Proxy.newProxyInstance(getClass().getClassLoader(), new Class[] {
           java.sql.Connection.class
        }, proxyBal);

//Proxy
public static Object newProxyInstance(ClassLoader loader,
					  Class<?>[] interfaces,
					  InvocationHandler h)
	throws IllegalArgumentException
    {
	if (h == null) {
	    throw new NullPointerException();
	}

	/*
	 * Look up or generate the designated proxy class.
	 */
        Class<?> cl = getProxyClass0(loader, interfaces); // stack walk magic: do not refactor

	/*
	 * Invoke its constructor with the designated invocation handler.
	 */
	try {
            final Constructor<?> cons = cl.getConstructor(constructorParams);
            final InvocationHandler ih = h;
            SecurityManager sm = System.getSecurityManager();
            if (sm != null && ProxyAccessHelper.needsNewInstanceCheck(cl)) {
                // create proxy instance with doPrivilege as the proxy class may
                // implement non-public interfaces that requires a special permission
                return AccessController.doPrivileged(new PrivilegedAction<Object>() {
                    public Object run() {
		        //创建实例
                        return newInstance(cons, ih);
                    }
                });
            } else {
                return newInstance(cons, ih);
            }
	} catch (NoSuchMethodException e) {
	    throw new InternalError(e.toString());
	} 
    }
//创建实例
private static Object newInstance(Constructor<?> cons, InvocationHandler h) {
        try {
            return cons.newInstance(new Object[] {h} );
        } catch (IllegalAccessException e) {
            throw new InternalError(e.toString());
        } catch (InstantiationException e) {
            throw new InternalError(e.toString());
        } catch (InvocationTargetException e) {
            Throwable t = e.getCause();
            if (t instanceof RuntimeException) {
                throw (RuntimeException) t;
            } else {
                throw new InternalError(t.toString());
            }
        }
    }

回到LoadBalancingConnectionProxy
public Object invoke(Object proxy, Method method, Object args[])
        throws Throwable
    {
        String methodName = method.getName();
        if("equals".equals(methodName) && args.length == 1)
            if(args[0] instanceof Proxy)
                return Boolean.valueOf(((Proxy)args[0]).equals(this));
            else
                return Boolean.valueOf(equals(args[0]));
        if("close".equals(methodName))
        {
            synchronized(this)
            {
                for(Iterator allConnections = liveConnections.values().iterator(); allConnections.hasNext(); ((Connection)allConnections.next()).close());
                if(!isClosed)
                    balancer.destroy();
                liveConnections.clear();
                connectionsToHostsMap.clear();
            }
            return null;
        }
        if("isClosed".equals(methodName))
            return Boolean.valueOf(isClosed);
        if(isClosed)
            throw SQLError.createSQLException("No operations allowed after connection closed.", "08003", null);
        if(!inTransaction)
        {
            inTransaction = true;
            transactionStartTime = getLocalTimeBestResolution();
        }
        Object result = null;
        try
        {
	    //关键在这里,当调用Connection的方法是,实际上调用的currentConn的对应方法
	    //这个currentConn我们前面有说
            result = method.invoke(currentConn, args);
            if(result != null)
            {
                if(result instanceof Statement)
                    ((Statement)result).setPingTarget(this);
                result = proxyIfInterfaceIsJdbc(result, result.getClass());
            }
        }
        catch(InvocationTargetException e)
        {
            dealWithInvocationException(e);
        }
        finally
        {
            if("commit".equals(methodName) || "rollback".equals(methodName))
            {
                inTransaction = false;
                String host = (String)connectionsToHostsMap.get(currentConn);
                if(host != null)
                {
                    int hostIndex = ((Integer)hostsToListIndexMap.get(host)).intValue();
                    synchronized(responseTimes)
                    {
                        responseTimes[hostIndex] = getLocalTimeBestResolution() - transactionStartTime;
                    }
                }
                pickNewConnection();
            }
        }
        return result;
    }

这里我们总结一下:
NonRegisteringDriver的负载均衡连接获取方法connectLoadBalanced,首先
构造负载均衡连接代理LoadBalancingConnectionProxy,再通过java动态代理Proxy
产生新建代理实例Connection,当我们调用Connection的prepareStatement等方法时,
实际上通过LoadBalancingConnectionProxy的currentConn(ConnectionImpl)调用其相应的方法。在构建LoadBalancingConnectionProxy的过程中,首先,初始化存活连接liveConnections,Host连接的相应时间responseTimes,构建负载均衡策略BestResponseTimeBalanceStrategy,RandomBalanceStrategy或InvalidLoadBalanceStrategy,然后初始化负载均衡策略,最后从负载均衡器获取连接,BestResponseTimeBalanceStrategy实际上是从liveConnections获取除host黑名单以外,相应时间最小的Connection,如果没有,则创建连接。
分享到:
评论

相关推荐

    windows下mysql主备双向复制与mycat负载均衡

    在Windows环境下搭建MySQL主备双向复制以及Mycat负载均衡,是一种常见的数据库架构策略,用于提高数据的可用性、一致性和系统的负载均衡能力。以下详细的知识点说明了这一过程: 1. MySQL主备双向复制的概念和作用...

    nginx负载均衡配置文件实例

    通过不同的负载均衡策略,如轮询、权重、最少连接数等,Nginx可以在服务器集群中智能地分配请求,保证服务的稳定性和效率。 ### 基本配置结构 一个基本的Nginx负载均衡配置通常包含以下部分: 1. **upstream块**...

    高可用性、负载均衡的mysql集群解决方案.docx

    MySQL 高可用性、负载均衡解决方案 MySQL 是世界上最流行的开源数据库,已有1100多万的安装,每天超过五万的下载。MySQL 为全球开发者、DBA 和 IT 管理者在可靠性、性能、易用性方面提供了选择。第三方市场调查...

    基于MySQL的负载均衡的搭建与研究.pdf

    为了提升数据库系统的效率和稳定性,通常会采用负载均衡策略,构建多台MySQL服务器集群。本文主要探讨了在RedHat9.0操作系统上搭建MySQL服务器集群的方法以及应对主从服务器故障的解决方案。 首先,搭建服务器集群...

    MySQL主从复制实现高可用性和负载均衡.pdf

    MySQL主从复制是一种常见的数据库高可用性和负载均衡解决方案,它通过在主库和从库之间同步数据,实现读写分离、容灾冗余和负载均衡,从而提高系统的整体性能和稳定性。 1. 读写分离:主从复制的核心是将读写操作...

    Haproxy+多台MySQL从服务器(Slave)实现负载均衡.docx

    配置Haproxy进行MySQL负载均衡的步骤大致如下: 1. 安装Haproxy:通常通过包管理器如`apt-get`或`yum`在Linux系统中安装。 2. 配置Haproxy:编辑`/etc/haproxy/haproxy.cfg`,设置全局和默认参数,以及定义监听器...

    MySQL Proxy 快速实现读写分离以及负载均衡

    ### MySQL Proxy 快速实现读写分离以及负载均衡 #### 一、概述 在数据库管理领域,MySQL Proxy 是一个开源的代理服务器,它为 MySQL 提供了一层中间件,能够帮助用户实现数据库的读写分离、负载均衡等功能,进而...

    具有负载均衡功能的MySQL服务器集群部署及实现

    【MySQL服务器集群负载均衡】 MySQL是一个高性能、多线程的关系型数据库管理系统,广泛应用于各种平台,具有优秀的可扩展性。在大规模的生产环境中,通过部署带有负载均衡功能的MySQL服务器集群,可以显著提升...

    基于haproxy构建负载均衡集群.docx

    4. 支持 TCP 协议的负载均衡转发,可以对 MySQL 读进行负载均衡,对后端的 MySQL 节点进行检测和负载均衡,可以用 LVS+Keepalived 对 MySQL 主从做负载均衡。 HAProxy 负载均衡策略非常多,HAProxy 的负载均衡算法...

    无标题Keepalived+Nginx+Tomcat+MySQL部署双机热备、负载均衡应用服务器

    ### 无标题Keepalived+Nginx+Tomcat+MySQL部署双机热备、负载均衡应用服务器 #### 配置概述 本文档旨在详细介绍如何在Linux环境下构建一套基于Keepalived、Nginx、Tomcat及MySQL的服务集群,实现双机热备与负载...

    Spring + Ibatis 与mysql集群集成

    5. **高可用性和负载均衡**:在Spring中,可以使用Ribbon或Hystrix组件实现对MySQL集群的负载均衡。这些组件会自动选择合适的SQL节点进行连接,当某个节点出现故障时,可以自动切换到其他节点。 6. **数据一致性**...

    python实现mysql的读写分离及负载均衡

    总的来说,Python实现MySQL的读写分离和负载均衡需要在应用程序中嵌入合适的逻辑,合理地管理数据库连接,并确保在系统架构层面做到低耦合和高可扩展性。这样的设计不仅降低了维护成本,也能在不显著增加开发工作量...

    jdbc连接mysql的文档

    通过示例代码,开发者可以了解如何使用MySQL Connector/J实现各种数据库操作,包括连接数据库、执行SQL语句、处理存储过程以及获取自增字段值等。 5. 参考信息。文档提供了关于驱动程序/数据源类名、URL语法和配置...

    nginx_tomcat_redis搭建负载均衡共享session

    - **负载均衡**:Nginx支持多种负载均衡策略,如轮询、权重轮询、最少连接数等。在这里,它会根据预设的策略将请求分发到多个Tomcat实例,确保各服务器负载均匀。 2. **Tomcat**: - **集群**:为了提高应用的...

    MySQL MySQL MySQL MySQL jar

    2. **高可用性**:MySQL支持主从复制,可以在多个服务器之间同步数据,提供故障转移和负载均衡能力。 3. **安全性**:通过用户权限管理和加密功能,MySQL确保了数据的安全性。 4. **跨平台**:MySQL可在Windows、...

    mysql-connector-java-5.1.37-jar

    对于生产环境,还需要考虑负载均衡、故障转移和数据安全策略,以确保应用的稳定性和数据的安全性。 总结来说,"mysql-connector-java-5.1.37-jar"是连接Java应用与MySQL数据库的关键组件,提供了高效、可靠的JDBC...

Global site tag (gtag.js) - Google Analytics