- 浏览: 980131 次
文章分类
- 全部博客 (428)
- Hadoop (2)
- HBase (1)
- ELK (1)
- ActiveMQ (13)
- Kafka (5)
- Redis (14)
- Dubbo (1)
- Memcached (5)
- Netty (56)
- Mina (34)
- NIO (51)
- JUC (53)
- Spring (13)
- Mybatis (17)
- MySQL (21)
- JDBC (12)
- C3P0 (5)
- Tomcat (13)
- SLF4J-log4j (9)
- P6Spy (4)
- Quartz (12)
- Zabbix (7)
- JAVA (9)
- Linux (15)
- HTML (9)
- Lucene (0)
- JS (2)
- WebService (1)
- Maven (4)
- Oracle&MSSQL (14)
- iText (11)
- Development Tools (8)
- UTILS (4)
- LIFE (8)
最新评论
-
Donald_Draper:
Donald_Draper 写道刘落落cici 写道能给我发一 ...
DatagramChannelImpl 解析三(多播) -
Donald_Draper:
刘落落cici 写道能给我发一份这个类的源码吗Datagram ...
DatagramChannelImpl 解析三(多播) -
lyfyouyun:
请问楼主,执行消息发送的时候,报错:Transport sch ...
ActiveMQ连接工厂、连接详解 -
ezlhq:
关于 PollArrayWrapper 状态含义猜测:参考 S ...
WindowsSelectorImpl解析一(FdMap,PollArrayWrapper) -
flyfeifei66:
打算使用xmemcache作为memcache的客户端,由于x ...
Memcached分布式客户端(Xmemcached)
Java动态代理:http://www.cnblogs.com/xiaoluo501395377/p/3383130.html
http://www.360doc.com/content/14/0801/14/1073512_398598312.shtml
JDBC驱动初始化-Mysql:http://donald-draper.iteye.com/blog/2342010
JDBC连接的获取:http://donald-draper.iteye.com/blog/2342011
前面我们讲过单机Server数据库连接的获取,今天来说一下,负载均衡集群下,连接的获取
url为jdbc:mysql:loadbalance://的数据库连接获取方法
//NonRegisteringDriver
//负载均衡连接获取方法
//LoadBalancingConnectionProxy
下面分三步来看LoadBalancingConnectionProxy的构建
//构建负载均衡策略
//Util
再看负载均衡器的初始化,这里我们以BestResponseTimeBalanceStrategy为例:
回到LoadBalancingConnectionProxy的构造方法,从负载均衡器获取连接
p
查看BestResponseTimeBalanceStrategy的pickConnection方法
//BestResponseTimeBalanceStrategy
来看LoadBalancingConnectionProxy创建连接:
在回到connectLoadBalanced函数:
//通过代理新建代理实例Connection
//Proxy
回到LoadBalancingConnectionProxy
这里我们总结一下:
NonRegisteringDriver的负载均衡连接获取方法connectLoadBalanced,首先
构造负载均衡连接代理LoadBalancingConnectionProxy,再通过java动态代理Proxy
产生新建代理实例Connection,当我们调用Connection的prepareStatement等方法时,
实际上通过LoadBalancingConnectionProxy的currentConn(ConnectionImpl)调用其相应的方法。在构建LoadBalancingConnectionProxy的过程中,首先,初始化存活连接liveConnections,Host连接的相应时间responseTimes,构建负载均衡策略BestResponseTimeBalanceStrategy,RandomBalanceStrategy或InvalidLoadBalanceStrategy,然后初始化负载均衡策略,最后从负载均衡器获取连接,BestResponseTimeBalanceStrategy实际上是从liveConnections获取除host黑名单以外,相应时间最小的Connection,如果没有,则创建连接。
http://www.360doc.com/content/14/0801/14/1073512_398598312.shtml
JDBC驱动初始化-Mysql:http://donald-draper.iteye.com/blog/2342010
JDBC连接的获取:http://donald-draper.iteye.com/blog/2342011
前面我们讲过单机Server数据库连接的获取,今天来说一下,负载均衡集群下,连接的获取
url为jdbc:mysql:loadbalance://的数据库连接获取方法
if(StringUtils.startsWithIgnoreCase(url, "jdbc:mysql:loadbalance://")) return connectLoadBalanced(url, info);
//NonRegisteringDriver
//负载均衡连接获取方法
private Connection connectLoadBalanced(String url, Properties info) throws SQLException { //解析url Properties parsedProps = parseURL(url, info); parsedProps.remove("roundRobinLoadBalance"); if(parsedProps == null) return null; String hostValues = parsedProps.getProperty("HOST"); List hostList = null; if(hostValues != null) hostList = StringUtils.split(hostValues, ",", true); if(hostList == null) { hostList = new ArrayList(); hostList.add("localhost:3306"); } //构造负载均衡连接代理 LoadBalancingConnectionProxy proxyBal = new LoadBalancingConnectionProxy(hostList, parsedProps); //通过代理新建代理实例Connection return (Connection)Proxy.newProxyInstance(getClass().getClassLoader(), new Class[] { java.sql.Connection.class }, proxyBal); }
//LoadBalancingConnectionProxy
public class LoadBalancingConnectionProxy implements InvocationHandler, PingTarget { private static Method getLocalTimeMethod; public static final String BLACKLIST_TIMEOUT_PROPERTY_KEY = "loadBalanceBlacklistTimeout"; private Connection currentConn; private List hostList; private Map liveConnections; private Map connectionsToHostsMap; private long responseTimes[]; private Map hostsToListIndexMap; private boolean inTransaction; private long transactionStartTime; private Properties localProps; private boolean isClosed; private BalanceStrategy balancer;//负载均衡策略 private int retriesAllDown; private static Map globalBlacklist = new HashMap(); private int globalBlacklistTimeout; static { try { getLocalTimeMethod = (java.lang.System.class).getMethod("nanoTime", new Class[0]); } catch(SecurityException e) { } catch(NoSuchMethodException e) { } } LoadBalancingConnectionProxy(List hosts, Properties props) throws SQLException { inTransaction = false; transactionStartTime = 0L; isClosed = false; globalBlacklistTimeout = 0; hostList = hosts; int numHosts = hostList.size(); //存活连接 liveConnections = new HashMap(numHosts); connectionsToHostsMap = new HashMap(numHosts); //Host连接的相应时间 responseTimes = new long[numHosts]; hostsToListIndexMap = new HashMap(numHosts); for(int i = 0; i < numHosts; i++) hostsToListIndexMap.put(hostList.get(i), new Integer(i)); localProps = (Properties)props.clone(); localProps.remove("HOST"); localProps.remove("PORT"); localProps.setProperty("useLocalSessionState", "true"); String strategy = localProps.getProperty("loadBalanceStrategy", "random"); String retriesAllDownAsString = localProps.getProperty("retriesAllDown", "120"); try { retriesAllDown = Integer.parseInt(retriesAllDownAsString); } catch(NumberFormatException nfe) { throw SQLError.createSQLException(Messages.getString("LoadBalancingConnectionProxy.badValueForRetriesAllDown", new Object[] { retriesAllDownAsString }), "S1009", null); } String blacklistTimeoutAsString = localProps.getProperty("loadBalanceBlacklistTimeout", "0"); try { globalBlacklistTimeout = Integer.parseInt(blacklistTimeoutAsString); } catch(NumberFormatException nfe) { throw SQLError.createSQLException(Messages.getString("LoadBalancingConnectionProxy.badValueForLoadBalanceBlacklistTimeout", new Object[] { retriesAllDownAsString }), "S1009", null); } //构建负载均衡策略 if("random".equals(strategy)) balancer = (BalanceStrategy)Util.loadExtensions(null, props, "com.mysql.jdbc.RandomBalanceStrategy", "InvalidLoadBalanceStrategy", null).get(0); else if("bestResponseTime".equals(strategy)) balancer = (BalanceStrategy)Util.loadExtensions(null, props, "com.mysql.jdbc.BestResponseTimeBalanceStrategy", "InvalidLoadBalanceStrategy", null).get(0); else balancer = (BalanceStrategy)Util.loadExtensions(null, props, strategy, "InvalidLoadBalanceStrategy", null).get(0); //初始化负载均衡器 balancer.init(null, props); //从负载均衡器获取连接 pickNewConnection(); } }
下面分三步来看LoadBalancingConnectionProxy的构建
//构建负载均衡策略
//Util
public static List loadExtensions(Connection conn, Properties props, String extensionClassNames, String errorMessageKey, ExceptionInterceptor exceptionInterceptor) throws SQLException { List extensionList = new LinkedList(); List interceptorsToCreate = StringUtils.split(extensionClassNames, ",", true); Iterator iter = interceptorsToCreate.iterator(); String className = null; try { Extension extensionInstance; for(; iter.hasNext(); extensionList.add(extensionInstance)) { className = iter.next().toString(); //加载className extensionInstance = (Extension)Class.forName(className).newInstance(); //初始化class extensionInstance.init(conn, props); } } catch(Throwable t) { SQLException sqlEx = SQLError.createSQLException(Messages.getString(errorMessageKey, new Object[] { className }), exceptionInterceptor); sqlEx.initCause(t); throw sqlEx; } return extensionList; }
再看负载均衡器的初始化,这里我们以BestResponseTimeBalanceStrategy为例:
public class BestResponseTimeBalanceStrategy implements BalanceStrategy { public void init(Connection connection, Properties properties) throws SQLException { //初始化为空,待扩展 } }
回到LoadBalancingConnectionProxy的构造方法,从负载均衡器获取连接
p
rivate synchronized void pickNewConnection() throws SQLException { if(currentConn == null) { currentConn = balancer.pickConnection(this, Collections.unmodifiableList(hostList), Collections.unmodifiableMap(liveConnections), (long[])responseTimes.clone(), retriesAllDown); return; } else { Connection newConn = balancer.pickConnection(this, Collections.unmodifiableList(hostList), Collections.unmodifiableMap(liveConnections), (long[])responseTimes.clone(), retriesAllDown); newConn.setTransactionIsolation(currentConn.getTransactionIsolation()); newConn.setAutoCommit(currentConn.getAutoCommit()); currentConn = newConn; return; } }
查看BestResponseTimeBalanceStrategy的pickConnection方法
//BestResponseTimeBalanceStrategy
public Connection pickConnection(LoadBalancingConnectionProxy proxy, List configuredHosts, Map liveConnections, long responseTimes[], int numRetries) throws SQLException { SQLException ex; label0: { Map blackList = proxy.getGlobalBlacklist(); ex = null; int attempts = 0; Connection conn; do { if(attempts >= numRetries) break label0; long minResponseTime = 9223372036854775807L; int bestHostIndex = 0; //获取代理host黑名单 if(blackList.size() == configuredHosts.size()) blackList = proxy.getGlobalBlacklist(); //从responseTimes筛选出相应时间最小的host索引index for(int i = 0; i < responseTimes.length; i++) { long candidateResponseTime = responseTimes[i]; if(candidateResponseTime >= minResponseTime || blackList.containsKey(configuredHosts.get(i))) continue; if(candidateResponseTime == 0L) { bestHostIndex = i; break; } bestHostIndex = i; minResponseTime = candidateResponseTime; } //从configuredHosts获取host String bestHost = (String)configuredHosts.get(bestHostIndex); //从liveConnections获取连接 conn = (Connection)liveConnections.get(bestHost); if(conn != null) break; try { //如果liveConnections不存在host对应的连接,则通过代理去创建一个连接 conn = proxy.createConnectionForHost(bestHost); break; } catch(SQLException sqlEx) { ex = sqlEx; if((sqlEx instanceof CommunicationsException) || "08S01".equals(sqlEx.getSQLState())) { //如果创建连接异常,则加入黑名单 proxy.addToGlobalBlacklist(bestHost); blackList.put(bestHost, null); if(blackList.size() == configuredHosts.size()) { attempts++; try { Thread.sleep(250L); } catch(InterruptedException e) { } blackList = proxy.getGlobalBlacklist(); } } else { throw sqlEx; } } } while(true); return conn; } if(ex != null) throw ex; else return null; }
来看LoadBalancingConnectionProxy创建连接:
public synchronized Connection createConnectionForHost(String hostPortSpec) throws SQLException { Properties connProps = (Properties)localProps.clone(); String hostPortPair[] = NonRegisteringDriver.parseHostPortPair(hostPortSpec); if(hostPortPair[1] == null) hostPortPair[1] = "3306"; connProps.setProperty("HOST", hostPortSpec); connProps.setProperty("PORT", hostPortPair[1]); //返回的实际为ConnectionImpl Connection conn = ConnectionImpl.getInstance(hostPortSpec, Integer.parseInt(hostPortPair[1]), connProps, connProps.getProperty("DBNAME"), "jdbc:mysql://" + hostPortPair[0] + ":" + hostPortPair[1] + "/"); liveConnections.put(hostPortSpec, conn); connectionsToHostsMap.put(conn, hostPortSpec); return conn; }
在回到connectLoadBalanced函数:
//通过代理新建代理实例Connection
return (Connection)Proxy.newProxyInstance(getClass().getClassLoader(), new Class[] { java.sql.Connection.class }, proxyBal);
//Proxy
public static Object newProxyInstance(ClassLoader loader, Class<?>[] interfaces, InvocationHandler h) throws IllegalArgumentException { if (h == null) { throw new NullPointerException(); } /* * Look up or generate the designated proxy class. */ Class<?> cl = getProxyClass0(loader, interfaces); // stack walk magic: do not refactor /* * Invoke its constructor with the designated invocation handler. */ try { final Constructor<?> cons = cl.getConstructor(constructorParams); final InvocationHandler ih = h; SecurityManager sm = System.getSecurityManager(); if (sm != null && ProxyAccessHelper.needsNewInstanceCheck(cl)) { // create proxy instance with doPrivilege as the proxy class may // implement non-public interfaces that requires a special permission return AccessController.doPrivileged(new PrivilegedAction<Object>() { public Object run() { //创建实例 return newInstance(cons, ih); } }); } else { return newInstance(cons, ih); } } catch (NoSuchMethodException e) { throw new InternalError(e.toString()); } } //创建实例 private static Object newInstance(Constructor<?> cons, InvocationHandler h) { try { return cons.newInstance(new Object[] {h} ); } catch (IllegalAccessException e) { throw new InternalError(e.toString()); } catch (InstantiationException e) { throw new InternalError(e.toString()); } catch (InvocationTargetException e) { Throwable t = e.getCause(); if (t instanceof RuntimeException) { throw (RuntimeException) t; } else { throw new InternalError(t.toString()); } } }
回到LoadBalancingConnectionProxy
public Object invoke(Object proxy, Method method, Object args[]) throws Throwable { String methodName = method.getName(); if("equals".equals(methodName) && args.length == 1) if(args[0] instanceof Proxy) return Boolean.valueOf(((Proxy)args[0]).equals(this)); else return Boolean.valueOf(equals(args[0])); if("close".equals(methodName)) { synchronized(this) { for(Iterator allConnections = liveConnections.values().iterator(); allConnections.hasNext(); ((Connection)allConnections.next()).close()); if(!isClosed) balancer.destroy(); liveConnections.clear(); connectionsToHostsMap.clear(); } return null; } if("isClosed".equals(methodName)) return Boolean.valueOf(isClosed); if(isClosed) throw SQLError.createSQLException("No operations allowed after connection closed.", "08003", null); if(!inTransaction) { inTransaction = true; transactionStartTime = getLocalTimeBestResolution(); } Object result = null; try { //关键在这里,当调用Connection的方法是,实际上调用的currentConn的对应方法 //这个currentConn我们前面有说 result = method.invoke(currentConn, args); if(result != null) { if(result instanceof Statement) ((Statement)result).setPingTarget(this); result = proxyIfInterfaceIsJdbc(result, result.getClass()); } } catch(InvocationTargetException e) { dealWithInvocationException(e); } finally { if("commit".equals(methodName) || "rollback".equals(methodName)) { inTransaction = false; String host = (String)connectionsToHostsMap.get(currentConn); if(host != null) { int hostIndex = ((Integer)hostsToListIndexMap.get(host)).intValue(); synchronized(responseTimes) { responseTimes[hostIndex] = getLocalTimeBestResolution() - transactionStartTime; } } pickNewConnection(); } } return result; }
这里我们总结一下:
NonRegisteringDriver的负载均衡连接获取方法connectLoadBalanced,首先
构造负载均衡连接代理LoadBalancingConnectionProxy,再通过java动态代理Proxy
产生新建代理实例Connection,当我们调用Connection的prepareStatement等方法时,
实际上通过LoadBalancingConnectionProxy的currentConn(ConnectionImpl)调用其相应的方法。在构建LoadBalancingConnectionProxy的过程中,首先,初始化存活连接liveConnections,Host连接的相应时间responseTimes,构建负载均衡策略BestResponseTimeBalanceStrategy,RandomBalanceStrategy或InvalidLoadBalanceStrategy,然后初始化负载均衡策略,最后从负载均衡器获取连接,BestResponseTimeBalanceStrategy实际上是从liveConnections获取除host黑名单以外,相应时间最小的Connection,如果没有,则创建连接。
发表评论
-
Mysql PreparedStatement 批处理
2016-12-06 18:09 1385JDBC驱动初始化-Mysql:http://donald-d ... -
MySQL ServerPreparedStatement查询
2016-12-06 14:42 1291JDBC驱动初始化-Mysql:http://donald-d ... -
MysqlSQL PreparedStatement的查询
2016-12-06 11:40 2028JDBC驱动初始化-Mysql:http://donald-d ... -
Mysql预编译SQL
2016-12-05 19:06 1160JDBC驱动初始化-Mysql:http://donald-d ... -
ConnectionImp创建MysqlIO
2016-12-05 19:01 1021JDBC驱动初始化-Mysql:http://donald-d ... -
Mysql主从复制读写分离连接的获取
2016-12-01 08:43 1162JDBC驱动初始化-Mysql:http://donald-d ... -
JDBC连接的获取
2016-11-30 12:44 2933JDBC驱动初始化-Mysql:http://donald-d ... -
JDBC驱动初始化-Mysql
2016-11-30 12:41 3050JDBC驱动初始化-Mysql:http://donald-d ... -
mysql 存储过程
2016-07-17 14:49 899存储过程基础:http://sishuok.com/forum ... -
java.sql.Date,java.util.Date,java.sql.Timestamp的区别
2016-07-17 11:50 880java.sql.Date,jdbc映射数据库中的date类型 ... -
JDBC PreparedStatement 的用法
2016-07-15 17:27 892import java.sql.Connection; im ...
相关推荐
Jupyter-Notebook
Jupyter-Notebook
高效甘特图模板下载-精心整理.zip
lstm Summary Framework: z = U>x, x u Uz Criteria for choosing U: • PCA: maximize projected variance • CCA: maximize projected correlation • FDA: maximize projected intraclass variance
OpenGL调试工具,适合图形开发者,包括视频开发,播放器开始以及游戏开发者。
全国行政区划shp最新图.zip
全国研究生招生与在校数据+国家线-最新.zip
Jupyter-Notebook
直播电商交流平台 SSM毕业设计 附带论文 启动教程:https://www.bilibili.com/video/BV1GK1iYyE2B
《林黛玉进贾府》课本剧剧本
2000-2020年沪深A股上市公司融资约束程度SA指数-最新数据发布.zip
PPT模版资料,PPT模版资料
CPA注会考试最新教材资料-最新发布.zip
1、资源项目源码均已通过严格测试验证,保证能够正常运行; 2、项目问题、技术讨论,可以给博主私信或留言,博主看到后会第一时间与您进行沟通; 3、本项目比较适合计算机领域相关的毕业设计课题、课程作业等使用,尤其对于人工智能、计算机科学与技术等相关专业,更为适合; 4、下载使用后,可先查看README.md文件(如有),本项目仅用作交流学习参考,请切勿用于商业用途。
内容概要:本文提供了一个完整的职工管理系统的C++源代码。通过面向对象的编程方法,实现了包括创建新职工、查询、增加、修改、删除、排序、统计以及存储和恢复职工数据在内的多个基本操作功能。该系统支持不同的用户角色(如管理员与老板),并通过菜单驱动方式让用户方便地进行相关操作。此外,还包括了错误检测机制,确保操作过程中的异常得到及时处理。 适合人群:有一定C++语言基础,特别是面向对象编程经验的程序员;企业管理人员和技术开发人员。 使用场景及目标:适用于中小型企业内部的人力资源管理部门或IT部门,用于维护员工基本信息数据库,提高工作效率。通过本项目的学习可以加深对链表、类和对象的理解。 阅读建议:建议先熟悉C++的基本语法和面向对象概念,再深入学习代码的具体实现细节。对于关键函数,比如exchange、creatilist等,应当重点关注并动手实践以加强理解。
Jupyter-Notebook
考研公共课历年真题集-最新发布.zip
Huawei-HKUST Joint Workshop on Theory for Future Wireless 15-16 September 2022 华为-香港科技大学未来无线理论联合研讨会 Speaker:Jingwen Tong
演出人员与观众疫情信息管理系统 SSM毕业设计 附带论文 启动教程:https://www.bilibili.com/video/BV1GK1iYyE2B
《林黛玉进贾府》课本剧剧本.pdf