1 数据库服务器连接
1.1 客户端连接处理(JDBCDriver)
1.1.1 描述
(1) 客户端通过JDBCDriver类创建JDBCConnection建立连接,最终通过ClientConnectionHttp、ClientConnction类创建一个与服务器类Server的socket连接。
(2) 上文已经描述了数据服务器启动的时候通过Server类、HsqlSoketFactory类创建了一个ServerSocket同时,通过accept方法循环监听客户端发起的请求。
(3) 当Server接收到客户端的请求时创建一个新的线程同时将该线程加入到线程组中,从代码看是创建了一个ServerConnection此类实现了runable接口。
1.1.2 JDBCDriver
public Connection connect(String url, Properties info) throws SQLException { if (url.regionMatches(true, 0, DatabaseURL.S_URL_INTERNAL, 0, DatabaseURL.S_URL_INTERNAL.length())) { JDBCConnection conn = (JDBCConnection) threadConnection.get(); if (conn == null) { returnnull; } return conn; } return getConnection(url, info);//此方法实现对JDBCConnectio类的创建 }
1.1.3 JDBCConnection
public JDBCConnection(HsqlProperties props) throws SQLException { String user = props.getProperty("user"); String password = props.getProperty("password"); String connType = props.getProperty("connection_type"); String host = props.getProperty("host"); int port = props.getIntegerProperty("port", 0); String path = props.getProperty("path"); String database = props.getProperty("database"); boolean isTLS = (connType == DatabaseURL.S_HSQLS || connType == DatabaseURL.S_HTTPS); boolean isTLSWrapper = props.isPropertyTrue(HsqlDatabaseProperties.url_tls_wrapper, false); isTLSWrapper &= isTLS; if (user == null) { user = "SA"; } if (password == null) { password = ""; } Calendar cal = Calendar.getInstance(); int zoneSeconds = HsqlDateTime.getZoneSeconds(cal); try { if (DatabaseURL.isInProcessDatabaseType(connType)) { /** * @todo- fredt - this should be the only static reference to * a core class (apart form references to the Type package) * from the jdbc package - we might make it dynamic */ sessionProxy = DatabaseManager.newSession(connType, database, user, password, props, null, zoneSeconds); } elseif (connType == DatabaseURL.S_HSQL || connType == DatabaseURL.S_HSQLS) { sessionProxy = new ClientConnection(host, port, path, database, isTLS, isTLSWrapper, user, password, zoneSeconds); isNetConn = true; } elseif (connType == DatabaseURL.S_HTTP || connType == DatabaseURL.S_HTTPS) { sessionProxy = new ClientConnectionHTTP(host, port, path, database, isTLS, isTLSWrapper, user, password, zoneSeconds); isNetConn = true; } else { // alias: type not yet implemented throw JDBCUtil.invalidArgument(connType); } sessionProxy.setJDBCConnection(this); connProperties = props; clientProperties = sessionProxy.getClientProperties(); if (connProperties != null) { isCloseResultSet = connProperties.isPropertyTrue( HsqlDatabaseProperties.url_close_result, false); isUseColumnName = connProperties.isPropertyTrue( HsqlDatabaseProperties.url_get_column_name, true); } } catch (HsqlException e) { throw JDBCUtil.sqlException(e); } }
1.1.4 ClientConnection->构造方法
/** * Establishes a connection to the server. */ public ClientConnection(String host, int port, String path, String database, boolean isTLS, boolean isTLSWrapper, String user, String password, int timeZoneSeconds) { this.host = host; this.port = port; this.path = path; this.database = database; this.isTLS = isTLS; this.isTLSWrapper = isTLSWrapper; this.zoneSeconds = timeZoneSeconds; this.zoneString = TimeZone.getDefault().getID(); initStructures(); Result login = Result.newConnectionAttemptRequest(user, password, database, zoneString, timeZoneSeconds); initConnection(host, port, isTLS); Result resultIn = execute(login); if (resultIn.isError()) { throw Error.error(resultIn); } sessionID = resultIn.getSessionId(); databaseID = resultIn.getDatabaseId(); databaseUniqueName = resultIn.getDatabaseName(); clientPropertiesString = resultIn.getMainString(); }
1.1.5 ClientConnection-> initConnection
protectedvoid initConnection(String host, int port, boolean isTLS) { openConnection(host, port, isTLS); } protectedvoid openConnection(String host, int port, boolean isTLS) { try { if (isTLSWrapper) { socket = HsqlSocketFactory.getInstance(false).createSocket(host, port); } socket = HsqlSocketFactory.getInstance(isTLS).createSocket(socket, host, port); //建立一个soket连接 socket.setTcpNoDelay(true); dataOutput = new DataOutputStream(socket.getOutputStream()); dataInput = new DataInputStream( new BufferedInputStream(socket.getInputStream())); handshake(); } catch (Exception e) { // The details from "e" should not be thrown away here. This is // very useful info for end users to diagnose the runtime problem. thrownew HsqlException(e, Error.getStateString(ErrorCode.X_08001), -ErrorCode.X_08001); } }
1.1.6 ServerConnection –>Run方法
/** * Initializes this connection and runs the request handling * loop until closed. */ publicvoid run() { int msgType; init(); if (session != null) { try { while (keepAlive) { msgType = dataInput.readByte(); if (msgType < ResultConstants.MODE_UPPER_LIMIT) { receiveResult(msgType); } else { receiveOdbcPacket((char) msgType); } } } catch (CleanExit ce) { keepAlive = false; } catch (IOException e) { // fredt - is thrown when connection drops server.printWithThread(mThread + ":disconnected " + user); } catch (HsqlException e) { // fredt - is thrown in unforeseen circumstances if (keepAlive) { server.printStackTrace(e); } } catch (Throwable e) { // fredt - is thrown in unforeseen circumstances if (keepAlive) { server.printStackTrace(e); } } } close(); }
1.1.7 ServerConnection->init初始化
/** * Initializes this connection. * <p> * Will return (not throw) if fail to initialize the connection. * </p> */ privatevoid init() { runnerThread = Thread.currentThread(); keepAlive = true; try { socket.setTcpNoDelay(true); dataInput = new DataInputStream( new BufferedInputStream(socket.getInputStream())); dataOutput = new DataOutputStream(socket.getOutputStream()); int firstInt = handshake(); switch (streamProtocol) { caseHSQL_STREAM_PROTOCOL : if (firstInt != ClientConnection .NETWORK_COMPATIBILITY_VERSION_INT) { if (firstInt == -1900000) { firstInt = -2000000; } String verString = ClientConnection.toNetCompVersionString(firstInt); throw Error.error( null, ErrorCode.SERVER_VERSIONS_INCOMPATIBLE, 0, new String[] { verString, HsqlDatabaseProperties.THIS_VERSION }); } Result resultIn = Result.newResult(dataInput, rowIn); resultIn.readAdditionalResults(session, dataInput, rowIn); Result resultOut; resultOut = setDatabase(resultIn); resultOut.write(session, dataOutput, rowOut); break; caseODBC_STREAM_PROTOCOL : odbcConnect(firstInt); break; default : // Protocol detection failures should already have been // handled. keepAlive = false; } } catch (Exception e) { // Only "unexpected" failures are caught here. // Expected failures will have been handled (by sending feedback // to user-- with an output Result for normal protocols), then // continuing. StringBuffer sb = new StringBuffer(mThread + ":Failed to connect client."); if (user != null) { sb.append(" User '" + user + "'."); } server.printWithThread(sb.toString() + " Stack trace follows."); server.printStackTrace(e); }
private Result setDatabase(Result resultIn) { try { String databaseName = resultIn.getDatabaseName(); dbIndex = server.getDBIndex(databaseName); dbID = server.dbID[dbIndex]; user = resultIn.getMainString(); if (!server.isSilent()) { server.printWithThread(mThread + ":Trying to connect user '" + user + "' to DB (" + databaseName + ')'); } session = DatabaseManager.newSession(dbID, user, resultIn.getSubString(), resultIn.getZoneString(), resultIn.getUpdateCount()); if (!server.isSilent()) { server.printWithThread(mThread + ":Connected user '" + user + "'"); } return Result.newConnectionAcknowledgeResponse( session.getDatabase(), session.getId(), session.getDatabase().getDatabaseID()); } catch (HsqlException e) { session = null; return Result.newErrorResult(e); } catch (RuntimeException e) { session = null; return Result.newErrorResult(e); } }
1.1.8 ServerConnection-> receiveResult接收结果集
privatevoid receiveResult(int resultMode) throws CleanExit, IOException { boolean terminate = false; Result resultIn = Result.newResult(session, resultMode, dataInput, rowIn); resultIn.readLobResults(session, dataInput, rowIn); server.printRequest(mThread, resultIn); Result resultOut = null; switch (resultIn.getType()) { case ResultConstants.CONNECT : { resultOut = setDatabase(resultIn); break; } case ResultConstants.DISCONNECT : { resultOut = Result.updateZeroResult; terminate = true; break; } case ResultConstants.RESETSESSION : { session.resetSession(); resultOut = Result.updateZeroResult; break; } case ResultConstants.EXECUTE_INVALID : { resultOut = Result.newErrorResult(Error.error(ErrorCode.X_07502)); break; } default : { resultOut = session.execute(resultIn); break; } } resultOut.write(session, dataOutput, rowOut); rowOut.reset(mainBuffer); rowIn.resetRow(mainBuffer.length); if (terminate) { throwcleanExit; } }
1.1.9 Session ->execute返回结果构造
/** * Executes the command encapsulated by the cmd argument. * * @param cmd the command to execute * @return the result of executing the command */ publicsynchronized Result execute(Result cmd) { Object[] pvals = (Object[]) cmd.valueData; Result result = executeCompiledStatement(cs, pvals, cmd.queryTimeout); result = performPostExecute(cmd, result); return result; } }
1.1.10Session-> executeCompiledStatement语句的执行
具体处理间statement章节,此步骤关联到2.2.5 章节。
1.2 数据库语句执行持久化流程(JDBCStatement)
1.2.1 JDBCStatement-> executeBatch
publicsynchronizedint[] executeBatch() throws SQLException { if (batchResultOut == null) { batchResultOut = Result.newBatchedExecuteRequest(); } try { resultIn = connection.sessionProxy.execute(batchResultOut); //此处的sessionProxy是session类的接口,session的初始化是在建立数据库连接的时候初始化的 performPostExecute(); } catch (HsqlException e) { batchResultOut.getNavigator().clear(); throw JDBCUtil.sqlException(e); } return updateCounts; }
1.2.2 JDBCStatement-> executeQuery
publicsynchronized ResultSet executeQuery( String sql) throws SQLException { fetchResult(sql, StatementTypes.RETURN_RESULT, JDBCStatementBase.NO_GENERATED_KEYS, null, null); return getResultSet(); }
1.2.3 JDBCStatement-> executeUpdate
publicsynchronizedint executeUpdate(String sql) throws SQLException { fetchResult(sql, StatementTypes.RETURN_COUNT, JDBCStatementBase.NO_GENERATED_KEYS, null, null); returnresultIn.getUpdateCount(); }
1.2.4 JDBCStatement->fetchResult
privatevoid fetchResult(String sql, int statementRetType, int generatedKeys, int[] generatedIndexes, String[] generatedNames) throws SQLException { if (isEscapeProcessing) { sql = connection.nativeSQL(sql); } resultOut.setPrepareOrExecuteProperties(sql, maxRows, fetchSize, statementRetType, queryTimeout, rsProperties, generatedKeys, generatedIndexes, generatedNames); //将sql转换为Result标准对象 try { resultIn = connection.sessionProxy.execute(resultOut); //此处的sessionProxy是session类的接口,session的初始化是在建立数据库连接的时候初始化的。 performPostExecute(); } catch (HsqlException e) { throw JDBCUtil.sqlException(e); } }
1.2.5 Session
/** * Executes the command encapsulated by the cmd argument. * * @param cmd the command to execute * @return the result of executing the command */ publicsynchronized Result execute(Result cmd) { Object[] pvals = (Object[]) cmd.valueData; Result result = executeCompiledStatement(cs, pvals, cmd.queryTimeout); result = performPostExecute(cmd, result); } public Result executeCompiledStatement(Statement cs, Object[] pvals, int timeout) { Result r; r = cs.execute(this); }
1.2.6 StatementCommand
public Result execute(Session session) { Result result; try { result = getResult(session); } catch (Throwable t) { result = Result.newErrorResult(t, null); } if (result.isError()) { result.getException().setStatementType(group, type); return result; } try { if (isLogged) { session.database.logger.writeOtherStatement(session, sql); } } catch (Throwable e) { return Result.newErrorResult(e, sql); } return result; }
Result getResult(Session session) { try { session.checkAdmin(); session.checkDDLWrite(); session.database.logger.checkpoint(defrag); return Result.updateZeroResult; } catch (HsqlException e) { return Result.newErrorResult(e, sql); } finally { session.database.lobManager.unlock(); } }
1.2.7 Logger.java
/** * Performs checkpoint including pre and post operations. Returns to the * same state as before the checkpoint. */ void checkpoint(boolean defrag) { if (filesReadOnly) { return; } if (cache == null) { defrag = false; } elseif (forceDefrag()) { defrag = true; } // test code /* if (database.logger.isStoredFileAccess) { defrag = false; } */ if (defrag) { defrag(); } else { checkpoint(); } } /** * Writes out all the rows to a new file without fragmentation. */ publicvoid defrag() { database.logger.logInfoEvent("defrag start"); try { synchLog(); database.lobManager.synch(); deleteOldDataFiles(); DataFileDefrag dfd = cache.defrag(); database.persistentStoreCollection.setNewTableSpaces(); database.sessionManager.resetLoggedSchemas(); } catch (HsqlException e) { throw e; } catch (Throwable e) { database.logger.logSevereEvent("defrag failure", e); throw Error.error(ErrorCode.DATA_FILE_ERROR, e); }
1.2.8 DataFileCache
DataFileDefrag defrag() { writeLock.lock(); try { cache.saveAll(); DataFileDefrag dfd = new DataFileDefrag(database, this, dataFileName); dfd.process(); close(); cache.clear(); if (!database.logger.propIncrementBackup) { backupNewDataFile(true); } database.schemaManager.setTempIndexRoots(dfd.getIndexRoots()); try { database.logger.log.writeScript(false); } finally { database.schemaManager.setTempIndexRoots(null); } database.getProperties().setProperty( HsqlDatabaseProperties.hsqldb_script_format, database.logger.propScriptFormat); database.getProperties().setDBModified( HsqlDatabaseProperties.FILES_MODIFIED_NEW); database.logger.log.closeLog(); database.logger.log.deleteLog(); database.logger.log.renameNewScript(); renameBackupFile(); renameDataFile(); database.getProperties().setDBModified( HsqlDatabaseProperties.FILES_NOT_MODIFIED); open(false); database.schemaManager.setIndexRoots(dfd.getIndexRoots()); if (database.logger.log.dbLogWriter != null) { database.logger.log.openLog(); } return dfd; } finally { writeLock.unlock(); } }
1.2.9 DataFileDefrag
--- 索引的处理
void process() { Throwable error = null; database.logger.logDetailEvent("Defrag process begins"); HsqlArrayList allTables = database.schemaManager.getAllTables(true); rootsList = newlong[allTables.size()][]; long maxSize = 0; for (int i = 0, tSize = allTables.size(); i < tSize; i++) { Table table = (Table) allTables.get(i); if (table.getTableType() == TableBase.CACHED_TABLE) { PersistentStore store = database.persistentStoreCollection.getStore(table); long size = store.elementCount(); if (size > maxSize) { maxSize = size; } } } if (maxSize > Integer.MAX_VALUE) { throw Error.error(ErrorCode.X_2200T); } try { pointerLookup = new DoubleIntIndex((int) maxSize, false); dataFileOut = new DataFileCache(database, dataFileName, true); pointerLookup.setKeysSearchTarget(); for (int i = 0, tSize = allTables.size(); i < tSize; i++) { Table t = (Table) allTables.get(i); if (t.getTableType() == TableBase.CACHED_TABLE) { long[] rootsArray = writeTableToDataFile(t); rootsList[i] = rootsArray; } else { rootsList[i] = null; } database.logger.logDetailEvent("table complete " + t.getName().name); } dataFileOut.fileModified = true; dataFileOut.close(); dataFileOut = null; for (int i = 0, size = rootsList.length; i < size; i++) { long[] roots = rootsList[i]; if (roots != null) { database.logger.logDetailEvent("roots: " + StringUtil.getList(roots, ",", "")); } } } catch (OutOfMemoryError e) { error = e; throw Error.error(ErrorCode.OUT_OF_MEMORY, e); } catch (Throwable t) { error = t; throw Error.error(ErrorCode.GENERAL_ERROR, t); } finally { try { if (dataFileOut != null) { dataFileOut.release(); } } catch (Throwable t) {} if (error instanceof OutOfMemoryError) { database.logger.logInfoEvent( "defrag failed - out of memory - required: " + maxSize * 8); } if (error == null) { database.logger.logDetailEvent("Defrag transfer complete: " + stopw.elapsedTime()); } else { database.logger.logSevereEvent("defrag failed ", error); database.logger.getFileAccess().removeElement(dataFileName + Logger.newFileExtension); } } }
long[] writeTableToDataFile(Table table) { PersistentStore store = table.database.persistentStoreCollection.getStore(table); TableSpaceManager space = dataFileOut.spaceManager.getTableSpace(table.getSpaceID()); long[] rootsArray = table.getIndexRootsArray(); pointerLookup.clear(); database.logger.logDetailEvent("lookup begins " + table.getName().name + " " + stopw.elapsedTime()); RowStoreAVLDisk.moveDataToSpace(store, dataFileOut, space, pointerLookup); for (int i = 0; i < table.getIndexCount(); i++) { if (rootsArray[i] == -1) { continue; } long pos = pointerLookup.lookup(rootsArray[i], -1); if (pos == -1) { throw Error.error(ErrorCode.DATA_FILE_ERROR); } rootsArray[i] = pos; } // log any discrepency in row count long count = rootsArray[table.getIndexCount() * 2]; if (count != pointerLookup.size()) { database.logger.logSevereEvent("discrepency in row count " + table.getName().name + " " + count + " " + pointerLookup.size(), null); } rootsArray[table.getIndexCount()] = 0; rootsArray[table.getIndexCount() * 2] = pointerLookup.size(); database.logger.logDetailEvent("table written " + table.getName().name); return rootsArray; }
1.2.10 DataFileCache
public DataFileCache(Database db, String baseFileName, boolean defrag) { initParams(db, baseFileName, true); cache = new Cache(this); try { if (database.logger.isStoredFileAccess()) { dataFile = RAFile.newScaledRAFile(database, dataFileName, false, RAFile.DATA_FILE_STORED); } else { dataFile = new RAFileSimple(database, dataFileName, "rw"); } } catch (Throwable t) { throw Error.error(ErrorCode.FILE_IO_ERROR, t); } initNewFile(); initBuffers(); if (database.logger.isDataFileSpaces()) { spaceManager = new DataSpaceManagerBlocks(this); } else { spaceManager = new DataSpaceManagerSimple(this); } }
1.2.1 RowStoreAVLDiskData
publicstaticvoid moveDataToSpace(PersistentStore store, DataFileCache cache, TableSpaceManager tableSpace, LongLookup pointerLookup) { RowIterator it = store.rowIterator(); while (it.hasNext()) { CachedObject row = it.getNextRow(); long newPos = tableSpace.getFilePosition(row.getStorageSize(), false); pointerLookup.addUnsorted(row.getPos(), newPos); } it = store.rowIterator(); while (it.hasNext()) { CachedObject row = it.getNextRow(); cache.rowOut.reset(); row.write(cache.rowOut, pointerLookup); long pos = pointerLookup.lookup(row.getPos()); cache.saveRowOutput(pos); } }
1.2.2 DataFileCache
publicvoid saveRowOutput(long pos) { try { dataFile.seek(pos * dataFileScale); dataFile.write(rowOut.getOutputStream().getBuffer(), 0, rowOut.getOutputStream().size()); } catch (Throwable t) { logSevereEvent("DataFileCache.saveRowOutput", t, pos); throw Error.error(ErrorCode.DATA_FILE_ERROR, t); } }
1.2.3 RAFileSimple
RAFileHybrid.java,RAFileInJar.java,RAFileNIO.java, RAFileSimple RandomAccessFile file = new RandomAccessFile(name, openMode); publicvoid write(byte[] b, int off, int len) throws IOException { file.write(b, off, len); }
例如,如果项目中存在`HSQLDB`的jar,Spring Boot会自动配置一个内存数据库。 3. **嵌入式Web服务器**:Spring Boot支持内嵌Tomcat、Jetty或Undertow等Web服务器,无需额外部署,简化了开发流程。 4. **Actuator**...
在"SpringBoot经典学习笔记"中,你可能会了解到以下关键知识点: 1. **起步依赖(Starter Dependencies)**:SpringBoot通过starter依赖来简化构建配置,比如`spring-boot-starter-web`用于Web应用,`spring-boot-...
**Spring Boot核心技术详解** Spring Boot是由Pivotal团队提供的全新框架,其设计目标是为了简化Spring应用的...希望这份学习笔记能帮助你深入理解和掌握Spring Boot的精髓,祝你在Spring Boot的学习之路上一帆风顺!
### jBoss + Tomcat 学习笔记大全及帮助文档概览 #### 一、环境搭建与配置 在开始深入探讨jBoss与Tomcat的集成之前,我们先来了解如何搭建基本的开发环境。 ##### 1. Java 环境配置 - **安装 J2SDK1.4+ 和 J2...
在"springboot-guides"的学习笔记中,我们可以深入探讨以下几个核心知识点: 1. **起步依赖(Starters)**: SpringBoot通过起步依赖来简化Maven或Gradle的配置,每个起步依赖都是一个包含相关依赖的模块集合,如...
6. **Cloud Connectors**:Spring Boot Cloud Connectors帮助在云环境中快速连接到数据库、消息队列等服务,使得应用能在不同云平台上无缝迁移。 7. **YAML/Properties配置**:Spring Boot支持使用YAML或Properties...
例如,如果在类路径下发现了`HSQLDB`数据库,Spring Boot将自动配置一个内存数据库连接。 3. **内嵌Web服务器(Embedded Web Server)** Spring Boot支持内嵌Tomcat、Jetty或Undertow等Web服务器,无需额外安装和...