- 浏览: 984124 次
文章分类
- 全部博客 (428)
- Hadoop (2)
- HBase (1)
- ELK (1)
- ActiveMQ (13)
- Kafka (5)
- Redis (14)
- Dubbo (1)
- Memcached (5)
- Netty (56)
- Mina (34)
- NIO (51)
- JUC (53)
- Spring (13)
- Mybatis (17)
- MySQL (21)
- JDBC (12)
- C3P0 (5)
- Tomcat (13)
- SLF4J-log4j (9)
- P6Spy (4)
- Quartz (12)
- Zabbix (7)
- JAVA (9)
- Linux (15)
- HTML (9)
- Lucene (0)
- JS (2)
- WebService (1)
- Maven (4)
- Oracle&MSSQL (14)
- iText (11)
- Development Tools (8)
- UTILS (4)
- LIFE (8)
最新评论
-
Donald_Draper:
Donald_Draper 写道刘落落cici 写道能给我发一 ...
DatagramChannelImpl 解析三(多播) -
Donald_Draper:
刘落落cici 写道能给我发一份这个类的源码吗Datagram ...
DatagramChannelImpl 解析三(多播) -
lyfyouyun:
请问楼主,执行消息发送的时候,报错:Transport sch ...
ActiveMQ连接工厂、连接详解 -
ezlhq:
关于 PollArrayWrapper 状态含义猜测:参考 S ...
WindowsSelectorImpl解析一(FdMap,PollArrayWrapper) -
flyfeifei66:
打算使用xmemcache作为memcache的客户端,由于x ...
Memcached分布式客户端(Xmemcached)
JDBC驱动初始化-Mysql:http://donald-draper.iteye.com/blog/2342010
JDBC连接的获取:http://donald-draper.iteye.com/blog/2342011
Mysql负载均衡连接的获取:http://donald-draper.iteye.com/blog/2342089
Mysql主从复制读写分离连接的获取:http://donald-draper.iteye.com/blog/2342108
ConnectionImp创建MysqlIO :http://donald-draper.iteye.com/blog/2342959
Mysql预编译SQL:http://donald-draper.iteye.com/blog/2342960
MysqlSQL PreparedStatement的查询:http://donald-draper.iteye.com/blog/2343083
MySQL ServerPreparedStatement查询:http://donald-draper.iteye.com/blog/2343124
在前文中,我们讲过statment的查询,今天来看一下批处理,在看批处理之前,statement设值
//设值
我们先看一下Prepare内部变量,再看setInternal方法
//描述PrepareStatement的参数信息,PrepareStatement的变量
现在来看setInternal方法
从上面可以看出,preparestatment设值,就是将值是否为null,是否是流,参数值,参数流信息放到statement
的isNull,isStream,parameterStreams,parameterValues中。
//添加到批处理
//批参数
//批处理sql
//批执行
来看一下,批量插入
//执行更新
我们来看一下这一段都做了些什么?
//StatementImpl
//创建Statement
//ConnectionImpl
从上面这一段可以看出,如果延时查询的话,就创建StatementImpl.CancelTask(TimeTask)定时任务,
并有ConnectionImpl的cancelTimer(Timer)去调度。
回到executeInternal方法,
来看执行查询,返回结果集,这个时调用ConnectionImpl的execSQL
//ConnectionImpl
来看MysqlIO执行查询
//MysqlIO
从上面可以看出,批处理就是将PrepareStatment的参数信息,是不是流,是否为null,参数值,参数流封装成BatchParams,添加到batchedArgs List中,执行批处理是将batchedArgs里的参数分批次通过MysqlIO发送给Server。
//清空批信息
从realClose方法来看,关闭Statement,就是清空参数类型,参数流,参数值,sql,databaseMetaData等信息。
JDBC连接的获取:http://donald-draper.iteye.com/blog/2342011
Mysql负载均衡连接的获取:http://donald-draper.iteye.com/blog/2342089
Mysql主从复制读写分离连接的获取:http://donald-draper.iteye.com/blog/2342108
ConnectionImp创建MysqlIO :http://donald-draper.iteye.com/blog/2342959
Mysql预编译SQL:http://donald-draper.iteye.com/blog/2342960
MysqlSQL PreparedStatement的查询:http://donald-draper.iteye.com/blog/2343083
MySQL ServerPreparedStatement查询:http://donald-draper.iteye.com/blog/2343124
在前文中,我们讲过statment的查询,今天来看一下批处理,在看批处理之前,statement设值
//设值
public void setString(int parameterIndex, String x) throws SQLException { if(x == null) { setNull(parameterIndex, 1); } else { checkClosed(); int stringLength = x.length(); if(connection.isNoBackslashEscapesSet()) { boolean needsHexEscape = isEscapeNeededForString(x, stringLength); if(!needsHexEscape) { byte parameterAsBytes[] = null; StringBuffer quotedString = new StringBuffer(x.length() + 2); quotedString.append('\''); quotedString.append(x); quotedString.append('\''); if(!isLoadDataQuery) parameterAsBytes = StringUtils.getBytes(quotedString.toString(), charConverter, charEncoding, connection.getServerCharacterEncoding(), connection.parserKnowsUnicode(), getExceptionInterceptor()); else parameterAsBytes = quotedString.toString().getBytes(); //将String转换为bytes,设值 setInternal(parameterIndex, parameterAsBytes); } else { byte parameterAsBytes[] = null; if(!isLoadDataQuery) parameterAsBytes = StringUtils.getBytes(x, charConverter, charEncoding, connection.getServerCharacterEncoding(), connection.parserKnowsUnicode(), getExceptionInterceptor()); else parameterAsBytes = x.getBytes(); //将String转换为bytes,设值 setBytes(parameterIndex, parameterAsBytes); } return; } String parameterAsString = x; boolean needsQuoted = true; if(isLoadDataQuery || isEscapeNeededForString(x, stringLength)) { needsQuoted = false; StringBuffer buf = new StringBuffer((int)((double)x.length() * 1.1000000000000001D)); buf.append('\''); for(int i = 0; i < stringLength; i++) { char c = x.charAt(i); switch(c) { default: break; case 0: // '\0' buf.append('\\'); buf.append('0'); continue; case 10: // '\n' buf.append('\\'); buf.append('n'); continue; case 13: // '\r' buf.append('\\'); buf.append('r'); continue; case 92: // '\\' buf.append('\\'); buf.append('\\'); continue; case 39: // '\'' buf.append('\\'); buf.append('\''); continue; case 34: // '"' if(usingAnsiMode) buf.append('\\'); buf.append('"'); continue; case 26: // '\032' buf.append('\\'); buf.append('Z'); continue; case 165: case 8361: if(charsetEncoder != null) { CharBuffer cbuf = CharBuffer.allocate(1); ByteBuffer bbuf = ByteBuffer.allocate(1); cbuf.put(c); cbuf.position(0); charsetEncoder.encode(cbuf, bbuf, true); if(bbuf.get(0) == 92) buf.append('\\'); } break; } buf.append(c); } buf.append('\''); parameterAsString = buf.toString(); } byte parameterAsBytes[] = null; if(!isLoadDataQuery) { if(needsQuoted) parameterAsBytes = StringUtils.getBytesWrapped(parameterAsString, '\'', '\'', charConverter, charEncoding, connection.getServerCharacterEncoding(), connection.parserKnowsUnicode(), getExceptionInterceptor()); else parameterAsBytes = StringUtils.getBytes(parameterAsString, charConverter, charEncoding, connection.getServerCharacterEncoding(), connection.parserKnowsUnicode(), getExceptionInterceptor()); } else { parameterAsBytes = parameterAsString.getBytes(); } setInternal(parameterIndex, parameterAsBytes); parameterTypes[(parameterIndex - 1) + getParameterIndexOffset()] = 12; } }
我们先看一下Prepare内部变量,再看setInternal方法
//描述PrepareStatement的参数信息,PrepareStatement的变量
private boolean isNull[];参数是否为null private boolean isStream[];//参数是不是流CLOB或BLOB protected int numberOfExecutions; protected String originalSql; protected int parameterCount;//参数数量 protected MysqlParameterMetadata parameterMetaData; private InputStream parameterStreams[];//参数流 private byte parameterValues[][];//参数值 protected int parameterTypes[];//参数类型
现在来看setInternal方法
protected final void setInternal(int paramIndex, byte val[]) throws SQLException { if(isClosed) { throw SQLError.createSQLException(Messages.getString("PreparedStatement.48"), "S1009", getExceptionInterceptor()); } else { int parameterIndexOffset = getParameterIndexOffset(); checkBounds(paramIndex, parameterIndexOffset); //paramIndex对应的参数是不是流 isStream[(paramIndex - 1) + parameterIndexOffset] = false; //paramIndex对应的参数是不是null isNull[(paramIndex - 1) + parameterIndexOffset] = false; //paramIndex对应的参数流 parameterStreams[(paramIndex - 1) + parameterIndexOffset] = null; //paramIndex对应的参数值 parameterValues[(paramIndex - 1) + parameterIndexOffset] = val; return; } } public void setBytes(int parameterIndex, byte x[]) throws SQLException { setBytes(parameterIndex, x, true, true); if(x != null) parameterTypes[(parameterIndex - 1) + getParameterIndexOffset()] = -2; } protected void setBytes(int parameterIndex, byte x[], boolean checkForIntroducer, boolean escapeForMBChars) throws SQLException { if(x == null) { setNull(parameterIndex, -2); } else { String connectionEncoding = connection.getEncoding(); if(connection.isNoBackslashEscapesSet() || escapeForMBChars && connection.getUseUnicode() && connectionEncoding != null && CharsetMapping.isMultibyteCharset(connectionEncoding)) { ByteArrayOutputStream bOut = new ByteArrayOutputStream(x.length * 2 + 3); bOut.write(120); bOut.write(39); for(int i = 0; i < x.length; i++) { int lowBits = (x[i] & 255) / 16; int highBits = (x[i] & 255) % 16; bOut.write(HEX_DIGITS[lowBits]); bOut.write(HEX_DIGITS[highBits]); } bOut.write(39); setInternal(parameterIndex, bOut.toByteArray()); return; } int numBytes = x.length; int pad = 2; boolean needsIntroducer = checkForIntroducer && connection.versionMeetsMinimum(4, 1, 0); if(needsIntroducer) pad += 7; ByteArrayOutputStream bOut = new ByteArrayOutputStream(numBytes + pad); if(needsIntroducer) { bOut.write(95); bOut.write(98); bOut.write(105); bOut.write(110); bOut.write(97); bOut.write(114); bOut.write(121); } bOut.write(39); for(int i = 0; i < numBytes; i++) { byte b = x[i]; switch(b) { case 0: // '\0' bOut.write(92); bOut.write(48); break; case 10: // '\n' bOut.write(92); bOut.write(110); break; case 13: // '\r' bOut.write(92); bOut.write(114); break; case 92: // '\\' bOut.write(92); bOut.write(92); break; case 39: // '\'' bOut.write(92); bOut.write(39); break; case 34: // '"' bOut.write(92); bOut.write(34); break; case 26: // '\032' bOut.write(92); bOut.write(90); break; default: bOut.write(b); break; } } bOut.write(39); //最后还是委托给setInternal setInternal(parameterIndex, bOut.toByteArray()); } }
从上面可以看出,preparestatment设值,就是将值是否为null,是否是流,参数值,参数流信息放到statement
的isNull,isStream,parameterStreams,parameterValues中。
//添加到批处理
public void addBatch() throws SQLException { if(batchedArgs == null) batchedArgs = new ArrayList(); for(int i = 0; i < parameterValues.length; i++) checkAllParametersSet(parameterValues[i], parameterStreams[i], i); batchedArgs.add(new BatchParams(parameterValues, parameterStreams, isStream, streamLengths, isNull)); }
//批参数
class BatchParams { boolean isNull[]; boolean isStream[]; InputStream parameterStreams[]; byte parameterStrings[][]; int streamLengths[]; BatchParams(byte strings[][], InputStream streams[], boolean isStreamFlags[], int lengths[], boolean isNullFlags[]) { isNull = null; isStream = null; parameterStreams = null; parameterStrings = (byte[][])null; streamLengths = null; parameterStrings = new byte[strings.length][]; parameterStreams = new InputStream[streams.length]; isStream = new boolean[isStreamFlags.length]; streamLengths = new int[lengths.length]; isNull = new boolean[isNullFlags.length]; System.arraycopy(strings, 0, parameterStrings, 0, strings.length); System.arraycopy(streams, 0, parameterStreams, 0, streams.length); System.arraycopy(isStreamFlags, 0, isStream, 0, isStreamFlags.length); System.arraycopy(lengths, 0, streamLengths, 0, lengths.length); System.arraycopy(isNullFlags, 0, isNull, 0, isNullFlags.length); } }
//批处理sql
public synchronized void addBatch(String sql) throws SQLException { batchHasPlainStatements = true; //委托给父类StatementImpl super.addBatch(sql); } //StatementImpl public synchronized void addBatch(String sql) throws SQLException { if(batchedArgs == null) batchedArgs = new ArrayList(); if(sql != null) batchedArgs.add(sql); }
//批执行
public int[] executeBatch() throws SQLException { checkClosed(); if(connection.isReadOnly()) throw new SQLException(Messages.getString("PreparedStatement.25") + Messages.getString("PreparedStatement.26"), "S1009"); //获取互斥锁 Object obj = connection.getMutex(); JVM INSTR monitorenter ; if(batchedArgs == null || batchedArgs.size() == 0) return new int[0]; int batchTimeout; batchTimeout = timeoutInMillis; timeoutInMillis = 0; resetCancelledState(); int ai[]; clearWarnings(); if(batchHasPlainStatements || !connection.getRewriteBatchedStatements()) break MISSING_BLOCK_LABEL_195; if(!canRewriteAsMultiValueInsertAtSqlLevel()) break MISSING_BLOCK_LABEL_141; //执行批插入 ai = executeBatchedInserts(batchTimeout); clearBatch(); obj; JVM INSTR monitorexit ; return ai; if(!connection.versionMeetsMinimum(4, 1, 0) || batchHasPlainStatements || batchedArgs == null || batchedArgs.size() <= 3) break MISSING_BLOCK_LABEL_195; //执行批PrepareStatment ai = executePreparedBatchAsMultiStatement(batchTimeout); clearBatch(); }
来看一下,批量插入
protected int[] executeBatchedInserts(int batchTimeout) throws SQLException { Connection locallyScopedConn; //批量大小 int numBatchedArgs; //每批的大小 int numValuesPerBatch; //batch处理statement java.sql.PreparedStatement batchedStatement; //batch参数index int batchedParamIndex; int updateCountRunningTotal; int batchCounter; //延时statement任务 StatementImpl.CancelTask timeoutTask; SQLException sqlEx; int updateCounts[]; String valuesClause = getValuesClause(); //连接 locallyScopedConn = connection; if(valuesClause == null) return executeBatchSerially(batchTimeout); //批量大小 numBatchedArgs = batchedArgs.size(); if(retrieveGeneratedKeys) batchedGeneratedKeys = new ArrayList(numBatchedArgs); //计算每批的大小 numValuesPerBatch = computeBatchSize(numBatchedArgs); if(numBatchedArgs < numValuesPerBatch) numValuesPerBatch = numBatchedArgs; batchedStatement = null; batchedParamIndex = 1; updateCountRunningTotal = 0; int numberToExecuteAsMultiValue = 0; batchCounter = 0; timeoutTask = null; sqlEx = null; updateCounts = new int[numBatchedArgs]; for(int i = 0; i < batchedArgs.size(); i++) updateCounts[i] = 1; batchedStatement = prepareBatchedInsertSQL((ConnectionImpl)locallyScopedConn, numValuesPerBatch); if(connection.getEnableQueryTimeouts() && batchTimeout != 0 && connection.versionMeetsMinimum(5, 0, 0)) { timeoutTask = new StatementImpl.CancelTask(this, (StatementImpl)batchedStatement); ConnectionImpl.getCancelTimer().schedule(timeoutTask, batchTimeout); } int numberToExecuteAsMultiValue; if(numBatchedArgs < numValuesPerBatch) numberToExecuteAsMultiValue = numBatchedArgs; else numberToExecuteAsMultiValue = numBatchedArgs / numValuesPerBatch; int numberArgsToExecute = numberToExecuteAsMultiValue * numValuesPerBatch; for(int i = 0; i < numberArgsToExecute; i++) { if(i != 0 && i % numValuesPerBatch == 0) { try { updateCountRunningTotal += batchedStatement.executeUpdate(); } catch(SQLException ex) { sqlEx = handleExceptionForBatch(batchCounter - 1, numValuesPerBatch, updateCounts, ex); } getBatchedGeneratedKeys(batchedStatement); batchedStatement.clearParameters(); batchedParamIndex = 1; } batchedParamIndex = setOneBatchedParameterSet(batchedStatement, batchedParamIndex, batchedArgs.get(batchCounter++)); } try { updateCountRunningTotal += batchedStatement.executeUpdate(); } catch(SQLException ex) { sqlEx = handleExceptionForBatch(batchCounter - 1, numValuesPerBatch, updateCounts, ex); } getBatchedGeneratedKeys(batchedStatement); numValuesPerBatch = numBatchedArgs - batchCounter; int ai[]; //分批次,执行更新 if(numValuesPerBatch > 0) { batchedStatement = prepareBatchedInsertSQL((ConnectionImpl)locallyScopedConn, numValuesPerBatch); if(timeoutTask != null) timeoutTask.toCancel = (StatementImpl)batchedStatement; for(batchedParamIndex = 1; batchCounter < numBatchedArgs; batchedParamIndex = setOneBatchedParameterSet(batchedStatement, batchedParamIndex, batchedArgs.get(batchCounter++))); try { //更新 updateCountRunningTotal += batchedStatement.executeUpdate(); } catch(SQLException ex) { sqlEx = handleExceptionForBatch(batchCounter - 1, numValuesPerBatch, updateCounts, ex); } getBatchedGeneratedKeys(batchedStatement); } ai = updateCounts; if(batchedStatement != null) batchedStatement.close(); return ai; } 在看执行更新 public int executeUpdate() throws SQLException { return executeUpdate(true, false); } protected int executeUpdate(boolean clearBatchedGeneratedKeysAndWarnings, boolean isBatch) throws SQLException { if(clearBatchedGeneratedKeysAndWarnings) { clearWarnings(); batchedGeneratedKeys = null; } return executeUpdate(parameterValues, parameterStreams, isStream, streamLengths, isNull, isBatch); } //执行更新 protected int executeUpdate(byte batchedParameterStrings[][], InputStream batchedParameterStreams[], boolean batchedIsStream[], int batchedStreamLengths[], boolean batchedIsNull[], boolean isReallyBatch) throws SQLException { checkClosed(); ConnectionImpl locallyScopedConn = connection; if(locallyScopedConn.isReadOnly()) throw SQLError.createSQLException(Messages.getString("PreparedStatement.34") + Messages.getString("PreparedStatement.35"), "S1009", getExceptionInterceptor()); if(firstCharOfStmt == 'S' && isSelectQuery()) throw SQLError.createSQLException(Messages.getString("PreparedStatement.37"), "01S03", getExceptionInterceptor()); if(results != null && !locallyScopedConn.getHoldResultsOpenOverStatementClose()) results.realClose(false); //返回结果 ResultSetInternalMethods rs = null; synchronized(locallyScopedConn.getMutex()) { //填充更新包 Buffer sendPacket = fillSendPacket(batchedParameterStrings, batchedParameterStreams, batchedIsStream, batchedStreamLengths); String oldCatalog = null; if(!locallyScopedConn.getCatalog().equals(currentCatalog)) { oldCatalog = locallyScopedConn.getCatalog(); locallyScopedConn.setCatalog(currentCatalog); } if(locallyScopedConn.useMaxRows()) executeSimpleNonQuery(locallyScopedConn, "SET OPTION SQL_SELECT_LIMIT=DEFAULT"); boolean oldInfoMsgState = false; if(retrieveGeneratedKeys) { oldInfoMsgState = locallyScopedConn.isReadInfoMsgEnabled(); locallyScopedConn.setReadInfoMsgEnabled(true); } //内部执行更新,这个我们在前面,看过,今天再看一下,只看关键 rs = executeInternal(-1, sendPacket, false, false, null, isReallyBatch); if(retrieveGeneratedKeys) { locallyScopedConn.setReadInfoMsgEnabled(oldInfoMsgState); rs.setFirstCharOfQuery(firstCharOfStmt); } if(oldCatalog != null) locallyScopedConn.setCatalog(oldCatalog); } results = rs; updateCount = rs.getUpdateCount(); if(containsOnDuplicateKeyUpdateInSQL() && compensateForOnDuplicateKeyUpdate && (updateCount == 2L || updateCount == 0L)) updateCount = 1L; int truncatedUpdateCount = 0; if(updateCount > 2147483647L) truncatedUpdateCount = 2147483647; else truncatedUpdateCount = (int)updateCount; lastInsertId = rs.getUpdateID(); return truncatedUpdateCount; }
//执行更新
protected ResultSetInternalMethods executeInternal(int maxRowsToRetrieve, Buffer sendPacket, boolean createStreamingResultSet, boolean queryIsSelectOnly, Field metadataFromCache[], boolean isBatch) throws SQLException { ConnectionImpl locallyScopedConnection; resetCancelledState(); locallyScopedConnection = connection; numberOfExecutions++; if(!doPingInstead) break MISSING_BLOCK_LABEL_36; doPingInstead(); return results; StatementImpl.CancelTask timeoutTask = null; ResultSetInternalMethods rs; if(locallyScopedConnection.getEnableQueryTimeouts() && timeoutInMillis != 0 && locallyScopedConnection.versionMeetsMinimum(5, 0, 0)) { timeoutTask = new StatementImpl.CancelTask(this, this); //连接定时任务执行器Timer,等待timeoutInMillis毫秒后,执行查询定时任务timeoutTask ConnectionImpl.getCancelTimer().schedule(timeoutTask, timeoutInMillis); } //执行查询,并返回结果集 rs = locallyScopedConnection.execSQL(this, null, maxRowsToRetrieve, sendPacket, resultSetType, resultSetConcurrency, createStreamingResultSet, currentCatalog, metadataFromCache, isBatch); if(timeoutTask != null) { timeoutTask.cancel(); if(timeoutTask.caughtWhileCancelling != null) throw timeoutTask.caughtWhileCancelling; timeoutTask = null; } synchronized(cancelTimeoutMutex) { if(wasCancelled) { SQLException cause = null; if(wasCancelledByTimeout) cause = new MySQLTimeoutException(); else cause = new MySQLStatementCancelledException(); resetCancelledState(); throw cause; } } if(timeoutTask != null) timeoutTask.cancel(); break MISSING_BLOCK_LABEL_242; Exception exception1; exception1; if(timeoutTask != null) timeoutTask.cancel(); throw exception1; return rs; NullPointerException npe; npe; checkClosed(); throw npe; }
我们来看一下这一段都做了些什么?
if(locallyScopedConnection.getEnableQueryTimeouts() && timeoutInMillis != 0 && locallyScopedConnection.versionMeetsMinimum(5, 0, 0)) { timeoutTask = new StatementImpl.CancelTask(this, this); //连接定时任务执行器Timer,等待timeoutInMillis毫秒后,执行查询定时任务timeoutTask ConnectionImpl.getCancelTimer().schedule(timeoutTask, timeoutInMillis); }
//StatementImpl
public class StatementImpl implements com.mysql.jdbc.Statement { class CancelTask extends TimerTask { public void run() { Thread cancelThread = new Thread() { public void run() { com.mysql.jdbc.Connection cancelConn = null; Statement cancelStmt = null; try { synchronized(cancelTimeoutMutex) { //复制connection cancelConn = connection.duplicate(); //创建Statement cancelStmt = cancelConn.createStatement(); cancelStmt.execute("KILL QUERY " + connectionId); toCancel.wasCancelled = true; toCancel.wasCancelledByTimeout = true; } } catch(SQLException sqlEx) { caughtWhileCancelling = sqlEx; } catch(NullPointerException npe) { } finally { if(cancelStmt != null) try { cancelStmt.close(); } catch(SQLException sqlEx) { throw new RuntimeException(sqlEx.toString()); } if(cancelConn != null) try { cancelConn.close(); } catch(SQLException sqlEx) { throw new RuntimeException(sqlEx.toString()); } } } }; cancelThread.start(); } long connectionId; SQLException caughtWhileCancelling; StatementImpl toCancel; CancelTask(StatementImpl cancellee) throws SQLException { connectionId = 0L; caughtWhileCancelling = null; connectionId = connection.getIO().getThreadId(); toCancel = cancellee; } } }
//创建Statement
cancelStmt = cancelConn.createStatement();
//ConnectionImpl
public Statement createStatement() throws SQLException { return createStatement(1003, 1007); } public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException { checkClosed(); StatementImpl stmt = new StatementImpl(this, database); stmt.setResultSetType(resultSetType); stmt.setResultSetConcurrency(resultSetConcurrency); return stmt; } //连接定时任务执行器Timer,等待timeoutInMillis毫秒后,执行查询定时任务timeoutTask ConnectionImpl.getCancelTimer().schedule(timeoutTask, timeoutInMillis); //ConnectionImpl protected static Timer getCancelTimer() { //private static Timer cancelTimer; return cancelTimer; }
从上面这一段可以看出,如果延时查询的话,就创建StatementImpl.CancelTask(TimeTask)定时任务,
并有ConnectionImpl的cancelTimer(Timer)去调度。
回到executeInternal方法,
来看执行查询,返回结果集,这个时调用ConnectionImpl的execSQL
//ConnectionImpl
ResultSetInternalMethods execSQL(StatementImpl callingStatement, String sql, int maxRows, Buffer packet, int resultSetType, int resultSetConcurrency, boolean streamResults, String catalog, Field cachedMetadata[], boolean isBatch) throws SQLException { Object obj = mutex; JVM INSTR monitorenter ; long queryStartTime; int endOfQueryPacketPosition; queryStartTime = 0L; endOfQueryPacketPosition = 0; if(packet != null) endOfQueryPacketPosition = packet.getPosition(); if(getGatherPerformanceMetrics()) queryStartTime = System.currentTimeMillis(); lastQueryFinishedTime = 0L; if(!failedOver || !autoCommit || isBatch || !shouldFallBack() || executingFailoverReconnect) break MISSING_BLOCK_LABEL_151; executingFailoverReconnect = true; //创建MysqlIO,这个我们在前面ServerPrepareStatement中讲过 createNewIO(true); String connectedHost = io.getHost(); if(connectedHost != null && hostList.get(0).equals(connectedHost)) { failedOver = false; queriesIssuedFailedOver = 0L; setReadOnlyInternal(false); } if((getHighAvailability() || failedOver) && (autoCommit || getAutoReconnectForPools()) && needsPing && !isBatch) try { pingInternal(false, 0); needsPing = false; } catch(Exception Ex) { createNewIO(true); } ResultSetInternalMethods resultsetinternalmethods1; if(packet != null) break MISSING_BLOCK_LABEL_267; String encoding = null; if(getUseUnicode()) encoding = getEncoding(); //关键在这一句MysqlIO执行查询 resultsetinternalmethods1 = io.sqlQueryDirect(callingStatement, sql, encoding, null, maxRows, resultSetType, resultSetConcurrency, streamResults, catalog, cachedMetadata); return resultsetinternalmethods1; ResultSetInternalMethods resultsetinternalmethods; try { resultsetinternalmethods = io.sqlQueryDirect(callingStatement, null, null, packet, maxRows, resultSetType, resultSetConcurrency, streamResults, catalog, cachedMetadata); } return resultsetinternalmethods; }
来看MysqlIO执行查询
//MysqlIO
final ResultSetInternalMethods sqlQueryDirect(StatementImpl callingStatement, String query, String characterEncoding, Buffer queryPacket, int maxRows, int resultSetType, int resultSetConcurrency, boolean streamResults, String catalog, Field cachedMetadata[]) throws Exception { statementExecutionDepth++; //返回结果集 ResultSetInternalMethods resultsetinternalmethods; if(statementInterceptors == null) break MISSING_BLOCK_LABEL_47; //拦截器处理 ResultSetInternalMethods interceptedResults = invokeStatementInterceptorsPre(query, callingStatement); if(interceptedResults == null) break MISSING_BLOCK_LABEL_47; resultsetinternalmethods = interceptedResults; statementExecutionDepth--; return resultsetinternalmethods; ResultSetInternalMethods resultsetinternalmethods1; long queryStartTime = 0L; long queryEndTime = 0L; if(query != null) { int packLength = 5 + query.length() * 2 + 2; String statementComment = connection.getStatementComment(); byte commentAsBytes[] = null; if(statementComment != null) { commentAsBytes = StringUtils.getBytes(statementComment, null, characterEncoding, connection.getServerCharacterEncoding(), connection.parserKnowsUnicode(), getExceptionInterceptor()); packLength += commentAsBytes.length; packLength += 6; } if(sendPacket == null) sendPacket = new Buffer(packLength); else sendPacket.clear(); sendPacket.writeByte((byte)3); if(commentAsBytes != null) { //将注释添加的查询包中 sendPacket.writeBytesNoNull(Constants.SLASH_STAR_SPACE_AS_BYTES); sendPacket.writeBytesNoNull(commentAsBytes); sendPacket.writeBytesNoNull(Constants.SPACE_STAR_SLASH_SPACE_AS_BYTES); } if(characterEncoding != null) { //如果有编码信息,则将查询编码后,组装到包中 if(platformDbCharsetMatches) sendPacket.writeStringNoNull(query, characterEncoding, connection.getServerCharacterEncoding(), connection.parserKnowsUnicode(), connection); else if(StringUtils.startsWithIgnoreCaseAndWs(query, "LOAD DATA")) sendPacket.writeBytesNoNull(query.getBytes()); else sendPacket.writeStringNoNull(query, characterEncoding, connection.getServerCharacterEncoding(), connection.parserKnowsUnicode(), connection); } else { sendPacket.writeStringNoNull(query); } queryPacket = sendPacket; } byte queryBuf[] = null; int oldPacketPosition = 0; if(needToGrabQueryFromPacket) { queryBuf = queryPacket.getByteBuffer(); oldPacketPosition = queryPacket.getPosition(); queryStartTime = getCurrentTimeNanosOrMillis(); } if(autoGenerateTestcaseScript) { String testcaseQuery = null; if(query != null) testcaseQuery = query; else testcaseQuery = new String(queryBuf, 5, oldPacketPosition - 5); StringBuffer debugBuf = new StringBuffer(testcaseQuery.length() + 32); connection.generateConnectionCommentBlock(debugBuf); debugBuf.append(testcaseQuery); debugBuf.append(';'); connection.dumpTestcaseQuery(debugBuf.toString()); } //MysqlIO发送查询包到Server,获取返回结果 Buffer resultPacket = sendCommand(3, null, queryPacket, false, null, 0); long fetchBeginTime = 0L; long fetchEndTime = 0L; String profileQueryToLog = null; boolean queryWasSlow = false; if(profileSql || logSlowQueries) { queryEndTime = System.currentTimeMillis(); boolean shouldExtractQuery = false; if(profileSql) shouldExtractQuery = true; else if(logSlowQueries) { long queryTime = queryEndTime - queryStartTime; boolean logSlow = false; if(useAutoSlowLog) { logSlow = queryTime > (long)connection.getSlowQueryThresholdMillis(); } else { logSlow = connection.isAbonormallyLongQuery(queryTime); connection.reportQueryTime(queryTime); } if(logSlow) { shouldExtractQuery = true; queryWasSlow = true; } } if(shouldExtractQuery) { boolean truncated = false; int extractPosition = oldPacketPosition; if(oldPacketPosition > connection.getMaxQuerySizeToLog()) { extractPosition = connection.getMaxQuerySizeToLog() + 5; truncated = true; } profileQueryToLog = new String(queryBuf, 5, extractPosition - 5); if(truncated) profileQueryToLog = profileQueryToLog + Messages.getString("MysqlIO.25"); } fetchBeginTime = queryEndTime; } //从中resultPacket,获取返回结果 ResultSetInternalMethods rs = readAllResults(callingStatement, maxRows, resultSetType, resultSetConcurrency, streamResults, catalog, resultPacket, false, -1L, cachedMetadata); if(queryWasSlow && !serverQueryWasSlow) { StringBuffer mesgBuf = new StringBuffer(48 + profileQueryToLog.length()); mesgBuf.append(Messages.getString("MysqlIO.SlowQuery", new Object[] { new Long(slowQueryThreshold), queryTimingUnits, new Long(queryEndTime - queryStartTime) })); mesgBuf.append(profileQueryToLog); ProfilerEventHandler eventSink = ProfilerEventHandlerFactory.getInstance(connection); eventSink.consumeEvent(new ProfilerEvent((byte)6, "", catalog, connection.getId(), callingStatement == null ? 999 : callingStatement.getId(), ((ResultSetImpl)rs).resultId, System.currentTimeMillis(), (int)(queryEndTime - queryStartTime), queryTimingUnits, null, new Throwable(), mesgBuf.toString())); if(connection.getExplainSlowQueries()) if(oldPacketPosition < 1048576) explainSlowQuery(queryPacket.getBytes(5, oldPacketPosition - 5), profileQueryToLog); else connection.getLog().logWarn(Messages.getString("MysqlIO.28") + 1048576 + Messages.getString("MysqlIO.29")); } if(logSlowQueries) { ProfilerEventHandler eventSink = ProfilerEventHandlerFactory.getInstance(connection); if(queryBadIndexUsed) eventSink.consumeEvent(new ProfilerEvent((byte)6, "", catalog, connection.getId(), callingStatement == null ? 999 : callingStatement.getId(), ((ResultSetImpl)rs).resultId, System.currentTimeMillis(), queryEndTime - queryStartTime, queryTimingUnits, null, new Throwable(), Messages.getString("MysqlIO.33") + profileQueryToLog)); if(queryNoIndexUsed) eventSink.consumeEvent(new ProfilerEvent((byte)6, "", catalog, connection.getId(), callingStatement == null ? 999 : callingStatement.getId(), ((ResultSetImpl)rs).resultId, System.currentTimeMillis(), queryEndTime - queryStartTime, queryTimingUnits, null, new Throwable(), Messages.getString("MysqlIO.35") + profileQueryToLog)); if(serverQueryWasSlow) eventSink.consumeEvent(new ProfilerEvent((byte)6, "", catalog, connection.getId(), callingStatement == null ? 999 : callingStatement.getId(), ((ResultSetImpl)rs).resultId, System.currentTimeMillis(), queryEndTime - queryStartTime, queryTimingUnits, null, new Throwable(), Messages.getString("MysqlIO.ServerSlowQuery") + profileQueryToLog)); } if(profileSql) { fetchEndTime = getCurrentTimeNanosOrMillis(); ProfilerEventHandler eventSink = ProfilerEventHandlerFactory.getInstance(connection); eventSink.consumeEvent(new ProfilerEvent((byte)3, "", catalog, connection.getId(), callingStatement == null ? 999 : callingStatement.getId(), ((ResultSetImpl)rs).resultId, System.currentTimeMillis(), queryEndTime - queryStartTime, queryTimingUnits, null, new Throwable(), profileQueryToLog)); eventSink.consumeEvent(new ProfilerEvent((byte)5, "", catalog, connection.getId(), callingStatement == null ? 999 : callingStatement.getId(), ((ResultSetImpl)rs).resultId, System.currentTimeMillis(), fetchEndTime - fetchBeginTime, queryTimingUnits, null, new Throwable(), null)); } if(hadWarnings) scanForAndThrowDataTruncation(); if(statementInterceptors != null) { ResultSetInternalMethods interceptedResults = invokeStatementInterceptorsPost(query, callingStatement, rs); if(interceptedResults != null) rs = interceptedResults; } resultsetinternalmethods1 = rs; statementExecutionDepth--; return resultsetinternalmethods1; } //从缓冲区,去除结果集 ResultSetImpl readAllResults(StatementImpl callingStatement, int maxRows, int resultSetType, int resultSetConcurrency, boolean streamResults, String catalog, Buffer resultPacket, boolean isBinaryEncoded, long preSentColumnCount, Field metadataFromCache[]) throws SQLException { resultPacket.setPosition(resultPacket.getPosition() - 1); ResultSetImpl topLevelResultSet = readResultsForQueryOrUpdate(callingStatement, maxRows, resultSetType, resultSetConcurrency, streamResults, catalog, resultPacket, isBinaryEncoded, preSentColumnCount, metadataFromCache); ResultSetImpl currentResultSet = topLevelResultSet; boolean checkForMoreResults = (clientParam & 131072L) != 0L; boolean serverHasMoreResults = (serverStatus & 8) != 0; if(serverHasMoreResults && streamResults) { if(topLevelResultSet.getUpdateCount() != -1L) tackOnMoreStreamingResults(topLevelResultSet); reclaimLargeReusablePacket(); return topLevelResultSet; } for(boolean moreRowSetsExist = checkForMoreResults & serverHasMoreResults; moreRowSetsExist; moreRowSetsExist = (serverStatus & 8) != 0) { Buffer fieldPacket = checkErrorPacket(); fieldPacket.setPosition(0); //读取结果集 ResultSetImpl newResultSet = readResultsForQueryOrUpdate(callingStatement, maxRows, resultSetType, resultSetConcurrency, streamResults, catalog, fieldPacket, isBinaryEncoded, preSentColumnCount, metadataFromCache); currentResultSet.setNextResultSet(newResultSet); currentResultSet = newResultSet; } if(!streamResults) clearInputStream(); reclaimLargeReusablePacket(); return topLevelResultSet; }
从上面可以看出,批处理就是将PrepareStatment的参数信息,是不是流,是否为null,参数值,参数流封装成BatchParams,添加到batchedArgs List中,执行批处理是将batchedArgs里的参数分批次通过MysqlIO发送给Server。
//清空批信息
public synchronized void clearBatch() throws SQLException { batchHasPlainStatements = false; super.clearBatch(); } //StatementImpl public synchronized void clearBatch() throws SQLException { if(batchedArgs != null) //清空参数List batchedArgs.clear(); } //清除参数 public synchronized void clearParameters() throws SQLException { checkClosed(); for(int i = 0; i < parameterValues.length; i++) { //重置statement,参数值,参数流,参数类型,是否为null,是否是流描述信息 parameterValues[i] = null; parameterStreams[i] = null; isStream[i] = false; isNull[i] = false; parameterTypes[i] = 0; } } //关闭Statement public synchronized void close() throws SQLException { //委托给realClose realClose(true, true); } //关闭Statement protected void realClose(boolean calledExplicitly, boolean closeOpenResults) throws SQLException { if(useUsageAdvisor && numberOfExecutions <= 1) { String message = Messages.getString("PreparedStatement.43"); eventSink.consumeEvent(new ProfilerEvent((byte)0, "", currentCatalog, connectionId, getId(), -1, System.currentTimeMillis(), 0L, Constants.MILLIS_I18N, null, pointOfOrigin, message)); } super.realClose(calledExplicitly, closeOpenResults); dbmd = null; originalSql = null; staticSqlStrings = (byte[][])null; parameterValues = (byte[][])null; parameterStreams = null; isStream = null; streamLengths = null; isNull = null; streamConvertBuf = null; parameterTypes = null; }
从realClose方法来看,关闭Statement,就是清空参数类型,参数流,参数值,sql,databaseMetaData等信息。
发表评论
-
MySQL ServerPreparedStatement查询
2016-12-06 14:42 1298JDBC驱动初始化-Mysql:http://donald-d ... -
MysqlSQL PreparedStatement的查询
2016-12-06 11:40 2035JDBC驱动初始化-Mysql:http://donald-d ... -
Mysql预编译SQL
2016-12-05 19:06 1167JDBC驱动初始化-Mysql:http://donald-d ... -
ConnectionImp创建MysqlIO
2016-12-05 19:01 1026JDBC驱动初始化-Mysql:http://donald-d ... -
Mysql主从复制读写分离连接的获取
2016-12-01 08:43 1170JDBC驱动初始化-Mysql:http://donald-d ... -
Mysql负载均衡连接的获取
2016-12-01 08:38 1334Java动态代理:http://www.cnblogs.com ... -
JDBC连接的获取
2016-11-30 12:44 2936JDBC驱动初始化-Mysql:http://donald-d ... -
JDBC驱动初始化-Mysql
2016-11-30 12:41 3053JDBC驱动初始化-Mysql:http://donald-d ... -
mysql 存储过程
2016-07-17 14:49 906存储过程基础:http://sishuok.com/forum ... -
java.sql.Date,java.util.Date,java.sql.Timestamp的区别
2016-07-17 11:50 882java.sql.Date,jdbc映射数据库中的date类型 ... -
JDBC PreparedStatement 的用法
2016-07-15 17:27 898import java.sql.Connection; im ...
相关推荐
MySQL批处理是JDBC提供的一种优化数据库操作的方法,它允许开发者一次提交多个SQL语句,从而提高数据处理效率。本文将深入探讨JDBC在MySQL数据库中的应用,以及如何实现批处理操作。 首先,理解JDBC的基础知识至关...
在MySQL 5.x版本中,JDBC驱动支持了基本的SQL操作、事务处理、批处理、预编译的SQL语句等。而随着MySQL 8.x版本的发布,引入了许多新特性,包括窗口函数、JSON支持、更好的并行查询优化、增强的加密功能、改进的复制...
6. **批处理**:如果你需要执行多条相似的SQL语句,可以使用`PreparedStatement`的`addBatch()`和`executeBatch()`方法进行批处理,提高效率。 7. **连接池**:为了优化资源管理,可以使用连接池(如`cppconn::...
Connection conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/testdb", "username", "password"); PreparedStatement pstmt = conn.prepareStatement("INSERT INTO table_name (column1, column2) ...
4. **性能优化**:支持预编译的PreparedStatement和批处理,提高执行效率。 5. **事务支持**:支持ACID(原子性、一致性、隔离性和持久性)事务特性。 6. **安全性**:提供加密连接,支持SSL,增强数据传输的安全...
- **多线程批处理**:允许在不同线程中执行批处理操作,提高并发性能。 使用JDBC驱动进行数据库操作时,开发者还需要注意处理异常、事务管理、连接池的使用等最佳实践,以确保程序的稳定性和效率。例如,使用try-...
- 适当使用批处理(Batch Processing)提交多条SQL语句,减少网络通信次数。 - 通过设置连接池,如C3P0或HikariCP,可以重用数据库连接,减少资源消耗。 总结,"mysql-connector-java"是Java与MySQL数据库之间的...
2. **性能优化**:MySQL Connector/J经过优化,可以充分利用MySQL的特性和性能,例如使用预编译的SQL语句(PreparedStatement)和批处理,提高执行效率。 3. **事务支持**:支持ACID(原子性、一致性、隔离性、持久...
- 其他特性:包括预编译的SQL语句(PreparedStatement)、批处理、存储过程调用等。 6. **安全性与优化**: - 使用最新的驱动版本可以提高性能并获得新的安全补丁。 - 应该避免在代码中硬编码数据库凭证,而是...
例如,较新版本的驱动可能支持更高级的SSL配置、更好的错误处理机制以及更高效的批处理等。 在实际开发中,为了保证系统的稳定性和安全性,建议使用与运行的MySQL服务器版本相匹配的最新驱动。此外,了解如何配置...
此外,了解JDBC异常处理、批处理操作、存储过程的调用等也是必备的知识点。在实际开发中,你可能还需要关注性能优化、连接池的配置、错误处理以及日志记录等方面的内容。 在下载并解压“MySQL 5.7 版本驱动包”后,...
- **批处理**:允许一次性发送多个SQL语句,提高性能。 - **连接池**:配合第三方库如C3P0或HikariCP,实现数据库连接的复用,提高系统效率。 - **预编译的SQL语句**(`PreparedStatement`):防止SQL注入攻击,提升...
8. **性能优化**: MySQL Connector/J 提供了各种性能优化选项,如连接池(如 C3P0 或 HikariCP)、预编译的 SQL 语句(PreparedStatement)、批处理(Batch Updates)等,可以帮助提升应用性能。 9. **安全性**: ...
9. **性能优化**:`mysql-connector-java`支持配置多种性能选项,如连接超时、查询缓存、批处理等,可以根据应用需求调整。 10. **新特性支持**:随着MySQL数据库版本的更新,驱动也会随之更新以支持新的SQL特性、...
- 提供了性能优化,比如批处理和缓存预编译的SQL语句。 - 支持SSL连接,确保数据传输的安全性。 - 提供了连接池功能,如C3P0和Apache DBCP,以提高应用性能和资源利用率。 - 兼容多种Java应用程序服务器和容器,如...
8. **性能优化**:MySQL Connector/J 5.1.37包含了性能优化特性,如预编译的SQL语句(`PreparedStatement`)、批处理(batch updates)和使用压缩来减少网络传输的数据量。 9. **异常处理**:JDBC驱动程序会抛出`...
6. **性能优化**:理解预编译的PreparedStatement和批处理操作如何提高数据库操作的效率。 7. **连接池**:在大型应用中,使用连接池(如C3P0、HikariCP或Apache DBCP)管理数据库连接,以提高性能和资源利用率。 ...
MySQL-Connector/J 5.1.30版本是在MySQL 5.5.x系列版本时代的一个稳定版本,它支持那时的大多数MySQL特性,包括事务、存储过程、预编译的SQL语句(PreparedStatement)、批处理以及连接池管理等。然而,需要注意的是...
MySQL Connector/J 8.0.29 版本是针对MySQL 8.0数据库的,它支持最新的特性和优化,例如:SSL/TLS加密连接、性能优化的批处理、Caching Socket Factory以提高连接速度、以及对InnoDB存储引擎的优化支持。这个版本还...
8. **预编译语句和批处理**:为了提高性能,预编译语句(PreparedStatement)和批处理(Batch Updates)是常用的技术。预编译语句可以缓存SQL的解析结果,批处理则可以一次发送多个SQL语句,减少网络通信次数。 9. ...