- 浏览: 987735 次
文章分类
- 全部博客 (428)
- Hadoop (2)
- HBase (1)
- ELK (1)
- ActiveMQ (13)
- Kafka (5)
- Redis (14)
- Dubbo (1)
- Memcached (5)
- Netty (56)
- Mina (34)
- NIO (51)
- JUC (53)
- Spring (13)
- Mybatis (17)
- MySQL (21)
- JDBC (12)
- C3P0 (5)
- Tomcat (13)
- SLF4J-log4j (9)
- P6Spy (4)
- Quartz (12)
- Zabbix (7)
- JAVA (9)
- Linux (15)
- HTML (9)
- Lucene (0)
- JS (2)
- WebService (1)
- Maven (4)
- Oracle&MSSQL (14)
- iText (11)
- Development Tools (8)
- UTILS (4)
- LIFE (8)
最新评论
-
Donald_Draper:
Donald_Draper 写道刘落落cici 写道能给我发一 ...
DatagramChannelImpl 解析三(多播) -
Donald_Draper:
刘落落cici 写道能给我发一份这个类的源码吗Datagram ...
DatagramChannelImpl 解析三(多播) -
lyfyouyun:
请问楼主,执行消息发送的时候,报错:Transport sch ...
ActiveMQ连接工厂、连接详解 -
ezlhq:
关于 PollArrayWrapper 状态含义猜测:参考 S ...
WindowsSelectorImpl解析一(FdMap,PollArrayWrapper) -
flyfeifei66:
打算使用xmemcache作为memcache的客户端,由于x ...
Memcached分布式客户端(Xmemcached)
Java动态代理:http://www.cnblogs.com/xiaoluo501395377/p/3383130.html
http://www.360doc.com/content/14/0801/14/1073512_398598312.shtml
JDBC驱动初始化-Mysql:http://donald-draper.iteye.com/blog/2342010
JDBC连接的获取:http://donald-draper.iteye.com/blog/2342011
前面我们讲过单机Server数据库连接的获取,今天来说一下,负载均衡集群下,连接的获取
url为jdbc:mysql:loadbalance://的数据库连接获取方法
//NonRegisteringDriver
//负载均衡连接获取方法
//LoadBalancingConnectionProxy
下面分三步来看LoadBalancingConnectionProxy的构建
//构建负载均衡策略
//Util
再看负载均衡器的初始化,这里我们以BestResponseTimeBalanceStrategy为例:
回到LoadBalancingConnectionProxy的构造方法,从负载均衡器获取连接
p
查看BestResponseTimeBalanceStrategy的pickConnection方法
//BestResponseTimeBalanceStrategy
来看LoadBalancingConnectionProxy创建连接:
在回到connectLoadBalanced函数:
//通过代理新建代理实例Connection
//Proxy
回到LoadBalancingConnectionProxy
这里我们总结一下:
NonRegisteringDriver的负载均衡连接获取方法connectLoadBalanced,首先
构造负载均衡连接代理LoadBalancingConnectionProxy,再通过java动态代理Proxy
产生新建代理实例Connection,当我们调用Connection的prepareStatement等方法时,
实际上通过LoadBalancingConnectionProxy的currentConn(ConnectionImpl)调用其相应的方法。在构建LoadBalancingConnectionProxy的过程中,首先,初始化存活连接liveConnections,Host连接的相应时间responseTimes,构建负载均衡策略BestResponseTimeBalanceStrategy,RandomBalanceStrategy或InvalidLoadBalanceStrategy,然后初始化负载均衡策略,最后从负载均衡器获取连接,BestResponseTimeBalanceStrategy实际上是从liveConnections获取除host黑名单以外,相应时间最小的Connection,如果没有,则创建连接。
http://www.360doc.com/content/14/0801/14/1073512_398598312.shtml
JDBC驱动初始化-Mysql:http://donald-draper.iteye.com/blog/2342010
JDBC连接的获取:http://donald-draper.iteye.com/blog/2342011
前面我们讲过单机Server数据库连接的获取,今天来说一下,负载均衡集群下,连接的获取
url为jdbc:mysql:loadbalance://的数据库连接获取方法
if(StringUtils.startsWithIgnoreCase(url, "jdbc:mysql:loadbalance://")) return connectLoadBalanced(url, info);
//NonRegisteringDriver
//负载均衡连接获取方法
private Connection connectLoadBalanced(String url, Properties info) throws SQLException { //解析url Properties parsedProps = parseURL(url, info); parsedProps.remove("roundRobinLoadBalance"); if(parsedProps == null) return null; String hostValues = parsedProps.getProperty("HOST"); List hostList = null; if(hostValues != null) hostList = StringUtils.split(hostValues, ",", true); if(hostList == null) { hostList = new ArrayList(); hostList.add("localhost:3306"); } //构造负载均衡连接代理 LoadBalancingConnectionProxy proxyBal = new LoadBalancingConnectionProxy(hostList, parsedProps); //通过代理新建代理实例Connection return (Connection)Proxy.newProxyInstance(getClass().getClassLoader(), new Class[] { java.sql.Connection.class }, proxyBal); }
//LoadBalancingConnectionProxy
public class LoadBalancingConnectionProxy implements InvocationHandler, PingTarget { private static Method getLocalTimeMethod; public static final String BLACKLIST_TIMEOUT_PROPERTY_KEY = "loadBalanceBlacklistTimeout"; private Connection currentConn; private List hostList; private Map liveConnections; private Map connectionsToHostsMap; private long responseTimes[]; private Map hostsToListIndexMap; private boolean inTransaction; private long transactionStartTime; private Properties localProps; private boolean isClosed; private BalanceStrategy balancer;//负载均衡策略 private int retriesAllDown; private static Map globalBlacklist = new HashMap(); private int globalBlacklistTimeout; static { try { getLocalTimeMethod = (java.lang.System.class).getMethod("nanoTime", new Class[0]); } catch(SecurityException e) { } catch(NoSuchMethodException e) { } } LoadBalancingConnectionProxy(List hosts, Properties props) throws SQLException { inTransaction = false; transactionStartTime = 0L; isClosed = false; globalBlacklistTimeout = 0; hostList = hosts; int numHosts = hostList.size(); //存活连接 liveConnections = new HashMap(numHosts); connectionsToHostsMap = new HashMap(numHosts); //Host连接的相应时间 responseTimes = new long[numHosts]; hostsToListIndexMap = new HashMap(numHosts); for(int i = 0; i < numHosts; i++) hostsToListIndexMap.put(hostList.get(i), new Integer(i)); localProps = (Properties)props.clone(); localProps.remove("HOST"); localProps.remove("PORT"); localProps.setProperty("useLocalSessionState", "true"); String strategy = localProps.getProperty("loadBalanceStrategy", "random"); String retriesAllDownAsString = localProps.getProperty("retriesAllDown", "120"); try { retriesAllDown = Integer.parseInt(retriesAllDownAsString); } catch(NumberFormatException nfe) { throw SQLError.createSQLException(Messages.getString("LoadBalancingConnectionProxy.badValueForRetriesAllDown", new Object[] { retriesAllDownAsString }), "S1009", null); } String blacklistTimeoutAsString = localProps.getProperty("loadBalanceBlacklistTimeout", "0"); try { globalBlacklistTimeout = Integer.parseInt(blacklistTimeoutAsString); } catch(NumberFormatException nfe) { throw SQLError.createSQLException(Messages.getString("LoadBalancingConnectionProxy.badValueForLoadBalanceBlacklistTimeout", new Object[] { retriesAllDownAsString }), "S1009", null); } //构建负载均衡策略 if("random".equals(strategy)) balancer = (BalanceStrategy)Util.loadExtensions(null, props, "com.mysql.jdbc.RandomBalanceStrategy", "InvalidLoadBalanceStrategy", null).get(0); else if("bestResponseTime".equals(strategy)) balancer = (BalanceStrategy)Util.loadExtensions(null, props, "com.mysql.jdbc.BestResponseTimeBalanceStrategy", "InvalidLoadBalanceStrategy", null).get(0); else balancer = (BalanceStrategy)Util.loadExtensions(null, props, strategy, "InvalidLoadBalanceStrategy", null).get(0); //初始化负载均衡器 balancer.init(null, props); //从负载均衡器获取连接 pickNewConnection(); } }
下面分三步来看LoadBalancingConnectionProxy的构建
//构建负载均衡策略
//Util
public static List loadExtensions(Connection conn, Properties props, String extensionClassNames, String errorMessageKey, ExceptionInterceptor exceptionInterceptor) throws SQLException { List extensionList = new LinkedList(); List interceptorsToCreate = StringUtils.split(extensionClassNames, ",", true); Iterator iter = interceptorsToCreate.iterator(); String className = null; try { Extension extensionInstance; for(; iter.hasNext(); extensionList.add(extensionInstance)) { className = iter.next().toString(); //加载className extensionInstance = (Extension)Class.forName(className).newInstance(); //初始化class extensionInstance.init(conn, props); } } catch(Throwable t) { SQLException sqlEx = SQLError.createSQLException(Messages.getString(errorMessageKey, new Object[] { className }), exceptionInterceptor); sqlEx.initCause(t); throw sqlEx; } return extensionList; }
再看负载均衡器的初始化,这里我们以BestResponseTimeBalanceStrategy为例:
public class BestResponseTimeBalanceStrategy implements BalanceStrategy { public void init(Connection connection, Properties properties) throws SQLException { //初始化为空,待扩展 } }
回到LoadBalancingConnectionProxy的构造方法,从负载均衡器获取连接
p
rivate synchronized void pickNewConnection() throws SQLException { if(currentConn == null) { currentConn = balancer.pickConnection(this, Collections.unmodifiableList(hostList), Collections.unmodifiableMap(liveConnections), (long[])responseTimes.clone(), retriesAllDown); return; } else { Connection newConn = balancer.pickConnection(this, Collections.unmodifiableList(hostList), Collections.unmodifiableMap(liveConnections), (long[])responseTimes.clone(), retriesAllDown); newConn.setTransactionIsolation(currentConn.getTransactionIsolation()); newConn.setAutoCommit(currentConn.getAutoCommit()); currentConn = newConn; return; } }
查看BestResponseTimeBalanceStrategy的pickConnection方法
//BestResponseTimeBalanceStrategy
public Connection pickConnection(LoadBalancingConnectionProxy proxy, List configuredHosts, Map liveConnections, long responseTimes[], int numRetries) throws SQLException { SQLException ex; label0: { Map blackList = proxy.getGlobalBlacklist(); ex = null; int attempts = 0; Connection conn; do { if(attempts >= numRetries) break label0; long minResponseTime = 9223372036854775807L; int bestHostIndex = 0; //获取代理host黑名单 if(blackList.size() == configuredHosts.size()) blackList = proxy.getGlobalBlacklist(); //从responseTimes筛选出相应时间最小的host索引index for(int i = 0; i < responseTimes.length; i++) { long candidateResponseTime = responseTimes[i]; if(candidateResponseTime >= minResponseTime || blackList.containsKey(configuredHosts.get(i))) continue; if(candidateResponseTime == 0L) { bestHostIndex = i; break; } bestHostIndex = i; minResponseTime = candidateResponseTime; } //从configuredHosts获取host String bestHost = (String)configuredHosts.get(bestHostIndex); //从liveConnections获取连接 conn = (Connection)liveConnections.get(bestHost); if(conn != null) break; try { //如果liveConnections不存在host对应的连接,则通过代理去创建一个连接 conn = proxy.createConnectionForHost(bestHost); break; } catch(SQLException sqlEx) { ex = sqlEx; if((sqlEx instanceof CommunicationsException) || "08S01".equals(sqlEx.getSQLState())) { //如果创建连接异常,则加入黑名单 proxy.addToGlobalBlacklist(bestHost); blackList.put(bestHost, null); if(blackList.size() == configuredHosts.size()) { attempts++; try { Thread.sleep(250L); } catch(InterruptedException e) { } blackList = proxy.getGlobalBlacklist(); } } else { throw sqlEx; } } } while(true); return conn; } if(ex != null) throw ex; else return null; }
来看LoadBalancingConnectionProxy创建连接:
public synchronized Connection createConnectionForHost(String hostPortSpec) throws SQLException { Properties connProps = (Properties)localProps.clone(); String hostPortPair[] = NonRegisteringDriver.parseHostPortPair(hostPortSpec); if(hostPortPair[1] == null) hostPortPair[1] = "3306"; connProps.setProperty("HOST", hostPortSpec); connProps.setProperty("PORT", hostPortPair[1]); //返回的实际为ConnectionImpl Connection conn = ConnectionImpl.getInstance(hostPortSpec, Integer.parseInt(hostPortPair[1]), connProps, connProps.getProperty("DBNAME"), "jdbc:mysql://" + hostPortPair[0] + ":" + hostPortPair[1] + "/"); liveConnections.put(hostPortSpec, conn); connectionsToHostsMap.put(conn, hostPortSpec); return conn; }
在回到connectLoadBalanced函数:
//通过代理新建代理实例Connection
return (Connection)Proxy.newProxyInstance(getClass().getClassLoader(), new Class[] { java.sql.Connection.class }, proxyBal);
//Proxy
public static Object newProxyInstance(ClassLoader loader, Class<?>[] interfaces, InvocationHandler h) throws IllegalArgumentException { if (h == null) { throw new NullPointerException(); } /* * Look up or generate the designated proxy class. */ Class<?> cl = getProxyClass0(loader, interfaces); // stack walk magic: do not refactor /* * Invoke its constructor with the designated invocation handler. */ try { final Constructor<?> cons = cl.getConstructor(constructorParams); final InvocationHandler ih = h; SecurityManager sm = System.getSecurityManager(); if (sm != null && ProxyAccessHelper.needsNewInstanceCheck(cl)) { // create proxy instance with doPrivilege as the proxy class may // implement non-public interfaces that requires a special permission return AccessController.doPrivileged(new PrivilegedAction<Object>() { public Object run() { //创建实例 return newInstance(cons, ih); } }); } else { return newInstance(cons, ih); } } catch (NoSuchMethodException e) { throw new InternalError(e.toString()); } } //创建实例 private static Object newInstance(Constructor<?> cons, InvocationHandler h) { try { return cons.newInstance(new Object[] {h} ); } catch (IllegalAccessException e) { throw new InternalError(e.toString()); } catch (InstantiationException e) { throw new InternalError(e.toString()); } catch (InvocationTargetException e) { Throwable t = e.getCause(); if (t instanceof RuntimeException) { throw (RuntimeException) t; } else { throw new InternalError(t.toString()); } } }
回到LoadBalancingConnectionProxy
public Object invoke(Object proxy, Method method, Object args[]) throws Throwable { String methodName = method.getName(); if("equals".equals(methodName) && args.length == 1) if(args[0] instanceof Proxy) return Boolean.valueOf(((Proxy)args[0]).equals(this)); else return Boolean.valueOf(equals(args[0])); if("close".equals(methodName)) { synchronized(this) { for(Iterator allConnections = liveConnections.values().iterator(); allConnections.hasNext(); ((Connection)allConnections.next()).close()); if(!isClosed) balancer.destroy(); liveConnections.clear(); connectionsToHostsMap.clear(); } return null; } if("isClosed".equals(methodName)) return Boolean.valueOf(isClosed); if(isClosed) throw SQLError.createSQLException("No operations allowed after connection closed.", "08003", null); if(!inTransaction) { inTransaction = true; transactionStartTime = getLocalTimeBestResolution(); } Object result = null; try { //关键在这里,当调用Connection的方法是,实际上调用的currentConn的对应方法 //这个currentConn我们前面有说 result = method.invoke(currentConn, args); if(result != null) { if(result instanceof Statement) ((Statement)result).setPingTarget(this); result = proxyIfInterfaceIsJdbc(result, result.getClass()); } } catch(InvocationTargetException e) { dealWithInvocationException(e); } finally { if("commit".equals(methodName) || "rollback".equals(methodName)) { inTransaction = false; String host = (String)connectionsToHostsMap.get(currentConn); if(host != null) { int hostIndex = ((Integer)hostsToListIndexMap.get(host)).intValue(); synchronized(responseTimes) { responseTimes[hostIndex] = getLocalTimeBestResolution() - transactionStartTime; } } pickNewConnection(); } } return result; }
这里我们总结一下:
NonRegisteringDriver的负载均衡连接获取方法connectLoadBalanced,首先
构造负载均衡连接代理LoadBalancingConnectionProxy,再通过java动态代理Proxy
产生新建代理实例Connection,当我们调用Connection的prepareStatement等方法时,
实际上通过LoadBalancingConnectionProxy的currentConn(ConnectionImpl)调用其相应的方法。在构建LoadBalancingConnectionProxy的过程中,首先,初始化存活连接liveConnections,Host连接的相应时间responseTimes,构建负载均衡策略BestResponseTimeBalanceStrategy,RandomBalanceStrategy或InvalidLoadBalanceStrategy,然后初始化负载均衡策略,最后从负载均衡器获取连接,BestResponseTimeBalanceStrategy实际上是从liveConnections获取除host黑名单以外,相应时间最小的Connection,如果没有,则创建连接。
发表评论
-
Mysql PreparedStatement 批处理
2016-12-06 18:09 1390JDBC驱动初始化-Mysql:http://donald-d ... -
MySQL ServerPreparedStatement查询
2016-12-06 14:42 1303JDBC驱动初始化-Mysql:http://donald-d ... -
MysqlSQL PreparedStatement的查询
2016-12-06 11:40 2041JDBC驱动初始化-Mysql:http://donald-d ... -
Mysql预编译SQL
2016-12-05 19:06 1174JDBC驱动初始化-Mysql:http://donald-d ... -
ConnectionImp创建MysqlIO
2016-12-05 19:01 1033JDBC驱动初始化-Mysql:http://donald-d ... -
Mysql主从复制读写分离连接的获取
2016-12-01 08:43 1178JDBC驱动初始化-Mysql:http://donald-d ... -
JDBC连接的获取
2016-11-30 12:44 2940JDBC驱动初始化-Mysql:http://donald-d ... -
JDBC驱动初始化-Mysql
2016-11-30 12:41 3061JDBC驱动初始化-Mysql:http://donald-d ... -
mysql 存储过程
2016-07-17 14:49 908存储过程基础:http://sishuok.com/forum ... -
java.sql.Date,java.util.Date,java.sql.Timestamp的区别
2016-07-17 11:50 885java.sql.Date,jdbc映射数据库中的date类型 ... -
JDBC PreparedStatement 的用法
2016-07-15 17:27 901import java.sql.Connection; im ...
相关推荐
在Windows环境下搭建MySQL主备双向复制以及Mycat负载均衡,是一种常见的数据库架构策略,用于提高数据的可用性、一致性和系统的负载均衡能力。以下详细的知识点说明了这一过程: 1. MySQL主备双向复制的概念和作用...
通过不同的负载均衡策略,如轮询、权重、最少连接数等,Nginx可以在服务器集群中智能地分配请求,保证服务的稳定性和效率。 ### 基本配置结构 一个基本的Nginx负载均衡配置通常包含以下部分: 1. **upstream块**...
MySQL 高可用性、负载均衡解决方案 MySQL 是世界上最流行的开源数据库,已有1100多万的安装,每天超过五万的下载。MySQL 为全球开发者、DBA 和 IT 管理者在可靠性、性能、易用性方面提供了选择。第三方市场调查...
为了提升数据库系统的效率和稳定性,通常会采用负载均衡策略,构建多台MySQL服务器集群。本文主要探讨了在RedHat9.0操作系统上搭建MySQL服务器集群的方法以及应对主从服务器故障的解决方案。 首先,搭建服务器集群...
MySQL主从复制是一种常见的数据库高可用性和负载均衡解决方案,它通过在主库和从库之间同步数据,实现读写分离、容灾冗余和负载均衡,从而提高系统的整体性能和稳定性。 1. 读写分离:主从复制的核心是将读写操作...
配置Haproxy进行MySQL负载均衡的步骤大致如下: 1. 安装Haproxy:通常通过包管理器如`apt-get`或`yum`在Linux系统中安装。 2. 配置Haproxy:编辑`/etc/haproxy/haproxy.cfg`,设置全局和默认参数,以及定义监听器...
### MySQL Proxy 快速实现读写分离以及负载均衡 #### 一、概述 在数据库管理领域,MySQL Proxy 是一个开源的代理服务器,它为 MySQL 提供了一层中间件,能够帮助用户实现数据库的读写分离、负载均衡等功能,进而...
【MySQL服务器集群负载均衡】 MySQL是一个高性能、多线程的关系型数据库管理系统,广泛应用于各种平台,具有优秀的可扩展性。在大规模的生产环境中,通过部署带有负载均衡功能的MySQL服务器集群,可以显著提升...
4. 支持 TCP 协议的负载均衡转发,可以对 MySQL 读进行负载均衡,对后端的 MySQL 节点进行检测和负载均衡,可以用 LVS+Keepalived 对 MySQL 主从做负载均衡。 HAProxy 负载均衡策略非常多,HAProxy 的负载均衡算法...
### 无标题Keepalived+Nginx+Tomcat+MySQL部署双机热备、负载均衡应用服务器 #### 配置概述 本文档旨在详细介绍如何在Linux环境下构建一套基于Keepalived、Nginx、Tomcat及MySQL的服务集群,实现双机热备与负载...
5. **高可用性和负载均衡**:在Spring中,可以使用Ribbon或Hystrix组件实现对MySQL集群的负载均衡。这些组件会自动选择合适的SQL节点进行连接,当某个节点出现故障时,可以自动切换到其他节点。 6. **数据一致性**...
总的来说,Python实现MySQL的读写分离和负载均衡需要在应用程序中嵌入合适的逻辑,合理地管理数据库连接,并确保在系统架构层面做到低耦合和高可扩展性。这样的设计不仅降低了维护成本,也能在不显著增加开发工作量...
通过示例代码,开发者可以了解如何使用MySQL Connector/J实现各种数据库操作,包括连接数据库、执行SQL语句、处理存储过程以及获取自增字段值等。 5. 参考信息。文档提供了关于驱动程序/数据源类名、URL语法和配置...
- **负载均衡**:Nginx支持多种负载均衡策略,如轮询、权重轮询、最少连接数等。在这里,它会根据预设的策略将请求分发到多个Tomcat实例,确保各服务器负载均匀。 2. **Tomcat**: - **集群**:为了提高应用的...
2. **高可用性**:MySQL支持主从复制,可以在多个服务器之间同步数据,提供故障转移和负载均衡能力。 3. **安全性**:通过用户权限管理和加密功能,MySQL确保了数据的安全性。 4. **跨平台**:MySQL可在Windows、...
对于生产环境,还需要考虑负载均衡、故障转移和数据安全策略,以确保应用的稳定性和数据的安全性。 总结来说,"mysql-connector-java-5.1.37-jar"是连接Java应用与MySQL数据库的关键组件,提供了高效、可靠的JDBC...
- **复制功能**:MySQL 5.1增强了主从复制功能,可以实现数据的实时备份和负载均衡。 - **触发器和存储过程**:提供更强大的数据库编程能力,简化复杂业务逻辑。 - **视图**:允许创建虚拟表,方便数据的聚合和展示...