public Result executeCompiledStatement(Statement cs, Object[] pvals) { Result r; if (abortTransaction) { rollback(false); return Result.newErrorResult(Error.error(ErrorCode.X_40001)); } if (sessionContext.depth > 0) { if (sessionContext.noSQL.booleanValue() || cs.isAutoCommitStatement()) { return Result.newErrorResult(Error.error(ErrorCode.X_46000)); } } if (cs.isAutoCommitStatement()) { if (isReadOnly()) { return Result.newErrorResult(Error.error(ErrorCode.X_25006)); } try { /** special autocommit for backward compatibility */ commit(false); } catch (HsqlException e) { database.logger.logInfoEvent("Exception at commit"); } } sessionContext.currentStatement = cs; boolean isTX = cs.isTransactionStatement(); if (!isTX) { if (database.logger.getSqlEventLogLevel() >= SimpleLog.LOG_NORMAL) { sessionContext.setDynamicArguments(pvals); database.logger.logStatementEvent(this, cs, pvals, SimpleLog.LOG_NORMAL); } r = cs.execute(this); sessionContext.currentStatement = null; return r; } while (true) { actionIndex = rowActionList.size(); database.txManager.beginAction(this, cs); cs = sessionContext.currentStatement; if (cs == null) { return Result.newErrorResult(Error.error(ErrorCode.X_07502)); } if (abortTransaction) { rollback(false); sessionContext.currentStatement = null; return Result.newErrorResult(Error.error(ErrorCode.X_40001)); } try { latch.await(); } catch (InterruptedException e) { abortTransaction = true; } if (abortTransaction) { rollback(false); sessionContext.currentStatement = null; return Result.newErrorResult(Error.error(ErrorCode.X_40001)); } database.txManager.beginActionResume(this); // tempActionHistory.add("sql execute " + cs.sql + " " + actionTimestamp + " " + rowActionList.size()); sessionContext.setDynamicArguments(pvals); if (database.logger.getSqlEventLogLevel() >= SimpleLog.LOG_NORMAL) { database.logger.logStatementEvent(this, cs, pvals, SimpleLog.LOG_NORMAL); } r = cs.execute(this); lockStatement = sessionContext.currentStatement; // tempActionHistory.add("sql execute end " + actionTimestamp + " " + rowActionList.size()); endAction(r); if (abortTransaction) { rollback(false); sessionContext.currentStatement = null; return Result.newErrorResult(Error.error(r.getException(), ErrorCode.X_40001, null)); } if (redoAction) { redoAction = false; try { latch.await(); } catch (InterruptedException e) { abortTransaction = true; } } else { break; } } if (sessionContext.depth == 0 && (sessionContext.isAutoCommit.booleanValue() || cs.isAutoCommitStatement())) { try { if (r.mode == ResultConstants.ERROR) { rollback(false); } else { commit(false); } } catch (Exception e) { sessionContext.currentStatement = null; return Result.newErrorResult(Error.error(ErrorCode.X_40001, e)); } } sessionContext.currentStatement = null; return r; }
database.txManager.beginAction(this, cs); 事务执行开始阶段, 这里事务处理用简单的方式进行处理,
/** * add session to the end of queue when a transaction starts * (depending on isolation mode) */ public void beginAction(Session session, Statement cs) { if (session.isTransaction) { return; } if (cs == null) { return; } writeLock.lock(); try { if (cs.getCompileTimestamp() < database.schemaManager.getSchemaChangeTimestamp()) { cs = session.statementManager.getStatement(session, cs); session.sessionContext.currentStatement = cs; if (cs == null) { return; } } session.isPreTransaction = true; if (!isLockedMode && !cs.isCatalogLock()) { return; } beginActionTPL(session, cs); } finally { writeLock.unlock(); } }
void lockTablesTPL(Session session, Statement cs) { if (cs == null || session.abortTransaction) { return; } HsqlName[] nameList = cs.getTableNamesForWrite(); for (int i = 0; i < nameList.length; i++) { HsqlName name = nameList[i]; if (name.schema == SqlInvariants.SYSTEM_SCHEMA_HSQLNAME) { continue; } tableWriteLocks.put(name, session); } nameList = cs.getTableNamesForRead(); for (int i = 0; i < nameList.length; i++) { HsqlName name = nameList[i]; if (name.schema == SqlInvariants.SYSTEM_SCHEMA_HSQLNAME) { continue; } tableReadLocks.put(name, session); } }
执行过程 r = cs.execute(this);
Result getResult(Session session) { Result resultOut = null; RowSetNavigator generatedNavigator = null; PersistentStore store = baseTable.getRowStore(session); int count; if (generatedIndexes != null) { resultOut = Result.newUpdateCountResult(generatedResultMetaData, 0); generatedNavigator = resultOut.getChainedResult().getNavigator(); } if (isSimpleInsert) { Type[] colTypes = baseTable.getColumnTypes(); Object[] data = getInsertData(session, colTypes, insertExpression.nodes[0].nodes); return insertSingleRow(session, store, data); } RowSetNavigator newDataNavigator = queryExpression == null ? getInsertValuesNavigator(session) : getInsertSelectNavigator(session); count = newDataNavigator.getSize(); if (count > 0) { insertRowSet(session, generatedNavigator, newDataNavigator); } if (baseTable.triggerLists[Trigger.INSERT_AFTER].length > 0) { baseTable.fireTriggers(session, Trigger.INSERT_AFTER, newDataNavigator); } if (resultOut == null) { resultOut = new Result(ResultConstants.UPDATECOUNT, count); } else { resultOut.setUpdateCount(count); } if (count == 0) { session.addWarning(HsqlException.noDataCondition); } return resultOut; }
PersistentStore store = baseTable.getRowStore(session);就是获取这个表对应的内存存储
终于来到 insertSingleRow插入一条记录操作了
Result insertSingleRow(Session session, PersistentStore store, Object[] data) { if (baseTable.triggerLists[Trigger.INSERT_BEFORE_ROW].length > 0) { baseTable.fireTriggers(session, Trigger.INSERT_BEFORE_ROW, null, data, null); } baseTable.insertSingleRow(session, store, data, null); performIntegrityChecks(session, baseTable, null, data, null); if (session.database.isReferentialIntegrity()) { for (int i = 0, size = baseTable.fkConstraints.length; i < size; i++) { baseTable.fkConstraints[i].checkInsert(session, baseTable, data, true); } } if (baseTable.triggerLists[Trigger.INSERT_AFTER_ROW].length > 0) { baseTable.fireTriggers(session, Trigger.INSERT_AFTER_ROW, null, data, null); } if (baseTable.triggerLists[Trigger.INSERT_AFTER].length > 0) { baseTable.fireTriggers(session, Trigger.INSERT_AFTER, (RowSetNavigator) null); } return Result.updateOneResult; }
/** * Mid level method for inserting single rows. Performs constraint checks and * fires row level triggers. */ Row insertSingleRow(Session session, PersistentStore store, Object[] data, int[] changedCols) { if (identityColumn != -1) { setIdentityColumn(session, data); } if (hasGeneratedValues) { setGeneratedColumns(session, data); } if (hasDomainColumns || hasNotNullColumns) { enforceRowConstraints(session, data); } if (isView) { // may have domain column return null; } Row row = (Row) store.getNewCachedObject(session, data, true); session.addInsertAction(this, store, row, changedCols); return row; }
/** * If there is an identity column in the table, sets * the value and/or adjusts the identiy value for the table. */ protected void setIdentityColumn(Session session, Object[] data) { if (identityColumn != -1) { Number id = (Number) data[identityColumn]; if (identitySequence.getName() == null) { if (id == null) { id = (Number) identitySequence.getValueObject(); data[identityColumn] = id; } else { identitySequence.userUpdate(id.longValue()); } } else { if (id == null) { id = (Number) session.sessionData.getSequenceValue( identitySequence); data[identityColumn] = id; } } if (session != null) { session.setLastIdentity(id); } } }
setGeneratedColumns 处理有表达式的列数据生成规则,
public void setGeneratedColumns(Session session, Object[] data) { if (hasGeneratedValues) { for (int i = 0; i < colGenerated.length; i++) { if (colGenerated[i]) { Expression e = getColumn(i).getGeneratingExpression(); RangeIteratorBase range = session.sessionContext.getCheckIterator( getDefaultRanges()[0]); range.currentData = data; data[i] = e.getValue(session, colTypes[i]); } } } }
enforceRowConstraints(session, data); 处理列的约束,如果整数是否超过限制等
/** * Enforce max field sizes according to SQL column definition. * SQL92 13.8 */ public void enforceRowConstraints(Session session, Object[] data) { for (int i = 0; i < columnCount; i++) { Type type = colTypes[i]; ColumnSchema column; if (hasDomainColumns && type.isDomainType()) { Constraint[] constraints = type.userTypeModifier.getConstraints(); column = getColumn(i); for (int j = 0; j < constraints.length; j++) { constraints[j].checkCheckConstraint(session, this, column, (Object) data[i]); } } if (colNotNull[i] && data[i] == null) { String constraintName; Constraint c = getNotNullConstraintForColumn(i); if (c == null) { if (ArrayUtil.find(this.primaryKeyCols, i) > -1) { c = this.getPrimaryConstraint(); } } constraintName = c == null ? "" : c.getName().name; column = getColumn(i); String[] info = new String[] { constraintName, tableName.statementName, column.getName().statementName }; throw Error.error(null, ErrorCode.X_23502, ErrorCode.COLUMN_CONSTRAINT, info); } } }
Row row = (Row) store.getNewCachedObject(session, data, true); 申请内存空间
public CachedObject getNewCachedObject(Session session, Object object, boolean tx) { int id; synchronized (this) { id = rowIdSequence++; } Row row = new RowAVL(table, (Object[]) object, id, this); if (tx) { RowAction action = new RowAction(session, table, RowAction.ACTION_INSERT, row, null); row.rowAction = action; } return row; }
RowAction action = new RowAction(session, table,
RowAction.ACTION_INSERT, row,
表操作Represents the chain of insert / delete / rollback / commit actions on a row.
插入之前做的事情太多了,session.addInsertAction(this, store, row, changedCols); 终于来到操作插入的过程了。
public void addInsertAction(Session session, Table table, PersistentStore store, Row row, int[] changedColumns) { RowAction action = row.rowAction; if (action == null) { /* System.out.println("null insert action " + session + " " + session.actionTimestamp); */ throw Error.runtimeError(ErrorCode.GENERAL_ERROR, "null insert action "); } store.indexRow(session, row); session.rowActionList.add(action); row.rowAction = null; }
用AVL tree保存表的数据信息,索引而且每个节点保存数据,普通的AVL数操作,插入过程还加了写锁
/** * Insert a node into the index */ public void insert(Session session, PersistentStore store, Row row) { NodeAVL n; NodeAVL x; boolean isleft = true; int compare = -1; final Object[] rowData = row.getData(); boolean compareRowId = !isUnique || hasNulls(session, rowData); boolean compareSimple = isSimple; writeLock.lock(); try { n = getAccessor(store); x = n; if (n == null) { store.setAccessor(this, ((RowAVL) row).getNode(position)); return; } while (true) { Row currentRow = n.row; compare = 0; if (compareSimple) { compare = colTypes[0].compare(session, rowData[colIndex[0]], currentRow.getData()[colIndex[0]]); if (compare == 0 && compareRowId) { compare = compareRowForInsertOrDelete(session, row, currentRow, compareRowId, 1); } } else { compare = compareRowForInsertOrDelete(session, row, currentRow, compareRowId, 0); } // after the first match and check, all compares are with row id if (compare == 0 && session != null && !compareRowId && session.database.txManager.isMVRows()) { if (!isEqualReadable(session, store, n)) { compareRowId = true; compare = compareRowForInsertOrDelete(session, row, currentRow, compareRowId, colIndex.length); } } if (compare == 0) { if (isConstraint) { Constraint c = ((Table) table).getUniqueConstraintForIndex(this); throw c.getException(row.getData()); } else { throw Error.error(ErrorCode.X_23505, name.statementName); } } isleft = compare < 0; x = n; n = isleft ? x.nLeft : x.nRight; if (n == null) { break; } } x = x.set(store, isleft, ((RowAVL) row).getNode(position)); balance(store, x, isleft); } finally { writeLock.unlock(); } }
/** * Balances part of the tree after an alteration to the index. */ void balance(PersistentStore store, NodeAVL x, boolean isleft) { while (true) { int sign = isleft ? 1 : -1; switch (x.iBalance * sign) { case 1 : x.iBalance = 0; return; case 0 : x.iBalance = -sign; break; case -1 : NodeAVL l = isleft ? x.nLeft : x.nRight; if (l.iBalance == -sign) { x.replace(store, this, l); x.set(store, isleft, l.child(store, !isleft)); l.set(store, !isleft, x); x.iBalance = 0; l.iBalance = 0; } else { NodeAVL r = !isleft ? l.nLeft : l.nRight; x.replace(store, this, r); l.set(store, !isleft, r.child(store, isleft)); r.set(store, isleft, l); x.set(store, isleft, r.child(store, !isleft)); r.set(store, !isleft, x); int rb = r.iBalance; x.iBalance = (rb == -sign) ? sign : 0; l.iBalance = (rb == sign) ? -sign : 0; r.iBalance = 0; } return; } if (x.nParent == null) { return; } isleft = x.nParent == null || x == x.nParent.nLeft; x = x.nParent; } }
执行完之后 endAction(r); 事务处理结束,出错则回滚,成功则提交
public void endAction(Result result) { // tempActionHistory.add("endAction " + actionTimestamp); sessionData.persistentStoreCollection.clearStatementTables(); if (result.mode == ResultConstants.ERROR) { sessionData.persistentStoreCollection.clearResultTables( actionTimestamp); database.txManager.rollbackAction(this); } else { sessionContext .diagnosticsVariables[ExpressionColumn.idx_row_count] = result.mode == ResultConstants.UPDATECOUNT ? Integer.valueOf(result.getUpdateCount()) : ValuePool.INTEGER_0; database.txManager.completeActions(this); } // tempActionHistory.add("endAction ends " + actionTimestamp); }
void endActionTPL(Session session) { if (session.isolationLevel == SessionInterface.TX_REPEATABLE_READ || session.isolationLevel == SessionInterface.TX_SERIALIZABLE) { return; } if (session.sessionContext.currentStatement == null) { // after java function / proc with db access return; } if (session.sessionContext.depth > 0) { // routine or trigger return; } HsqlName[] readLocks = session.sessionContext.currentStatement.getTableNamesForRead(); if (readLocks.length == 0) { return; } writeLock.lock(); try { unlockReadTablesTPL(session, readLocks); final int waitingCount = session.waitingSessions.size(); if (waitingCount == 0) { return; } boolean canUnlock = false; // if write lock was used for read lock for (int i = 0; i < readLocks.length; i++) { if (tableWriteLocks.get(readLocks[i]) != session) { canUnlock = true; break; } } if (!canUnlock) { return; } canUnlock = false; for (int i = 0; i < waitingCount; i++) { Session current = (Session) session.waitingSessions.get(i); if (current.abortTransaction) { canUnlock = true; break; } Statement currentStatement = current.sessionContext.currentStatement; if (currentStatement == null) { canUnlock = true; break; } if (ArrayUtil.containsAny( readLocks, currentStatement.getTableNamesForWrite())) { canUnlock = true; break; } } if (!canUnlock) { return; } resetLocks(session); resetLatchesMidTransaction(session); } finally { writeLock.unlock(); } }
