`

hsqldb源码分析系列6之事务处理

 
阅读更多

  

  在session的 public Result executeCompiledStatement(Statement cs, Object[] pvals)方法执行中会处理事务

  

        boolean isTX = cs.isTransactionStatement();

        if (!isTX) {
            if (database.logger.getSqlEventLogLevel()
                    >= SimpleLog.LOG_NORMAL) {
                sessionContext.setDynamicArguments(pvals);
                database.logger.logStatementEvent(this, cs, pvals,
                                                  SimpleLog.LOG_NORMAL);
            }

            r                               = cs.execute(this);
            sessionContext.currentStatement = null;

            return r;
        }

        while (true) {
            actionIndex = rowActionList.size();

            database.txManager.beginAction(this, cs);

            cs = sessionContext.currentStatement;

            if (cs == null) {
                return Result.newErrorResult(Error.error(ErrorCode.X_07502));
            }

            if (abortTransaction) {
                rollback(false);

                sessionContext.currentStatement = null;

                return Result.newErrorResult(Error.error(ErrorCode.X_40001));
            }

            try {
                latch.await();
            } catch (InterruptedException e) {
                abortTransaction = true;
            }

            if (abortTransaction) {
                rollback(false);

                sessionContext.currentStatement = null;

                return Result.newErrorResult(Error.error(ErrorCode.X_40001));
            }

            database.txManager.beginActionResume(this);

            //        tempActionHistory.add("sql execute " + cs.sql + " " + actionTimestamp + " " + rowActionList.size());
            sessionContext.setDynamicArguments(pvals);

            if (database.logger.getSqlEventLogLevel()
                    >= SimpleLog.LOG_NORMAL) {
                database.logger.logStatementEvent(this, cs, pvals,
                                                  SimpleLog.LOG_NORMAL);
            }

            r             = cs.execute(this);
            lockStatement = sessionContext.currentStatement;

            //        tempActionHistory.add("sql execute end " + actionTimestamp + " " + rowActionList.size());
            endAction(r);

            if (abortTransaction) {
                rollback(false);

                sessionContext.currentStatement = null;

                return Result.newErrorResult(Error.error(r.getException(),
                        ErrorCode.X_40001, null));
            }

            if (redoAction) {
                redoAction = false;

                try {
                    latch.await();
                } catch (InterruptedException e) {
                    abortTransaction = true;
                }
            } else {
                break;
            }
        }

        if (sessionContext.depth == 0
                && (sessionContext.isAutoCommit.booleanValue()
                    || cs.isAutoCommitStatement())) {
            try {
                if (r.mode == ResultConstants.ERROR) {
                    rollback(false);
                } else {
                    commit(false);
                }
            } catch (Exception e) {
                sessionContext.currentStatement = null;

                return Result.newErrorResult(Error.error(ErrorCode.X_40001,
                        e));
            }
        }

        sessionContext.currentStatement = null;

        return r;

 

   database.txManager.beginAction(this, cs);  事务开始前需要先对当前的statement cs需要操作的表进行加锁,读表加读锁,写的需要加写锁

   这是两阶段提交协议的事务实现TransactionManager2PL的实现

    

 public void beginAction(Session session, Statement cs) {

        if (session.hasLocks(cs)) {
            return;
        }

        writeLock.lock();

        try {
            if (cs.getCompileTimestamp()
                    < database.schemaManager.getSchemaChangeTimestamp()) {
                cs = session.statementManager.getStatement(session, cs);
                session.sessionContext.currentStatement = cs;

                if (cs == null) {
                    return;
                }
            }

            boolean canProceed = setWaitedSessionsTPL(session, cs);

            if (canProceed) {
                if (session.tempSet.isEmpty()) {
                    lockTablesTPL(session, cs);

                    // we don't set other sessions that would now be waiting for this one too
                    // next lock release will do it
                } else {
                    setWaitingSessionTPL(session);
                }
            }
        } finally {
            writeLock.unlock();
        }
    }

 session.hasLocks(cs) 判断是否当前的statement是否已经加了锁了

  

    public boolean hasLocks(Statement statement) {

        if (lockStatement == statement) {
            if (isolationLevel == SessionInterface.TX_REPEATABLE_READ
                    || isolationLevel == SessionInterface.TX_SERIALIZABLE) {
                return true;
            }

            if (statement.getTableNamesForRead().length == 0) {
                return true;
            }
        }

        return false;
    }

  boolean canProceed = setWaitedSessionsTPL(session, cs); 判断下当前Statement对读写的表是否能够加锁成功,temSet保存每个session依赖的其他session的锁,checkDeadlock(session, session.tempSet)检测下session依赖的锁是否会出现死锁,如果加锁失败则设置session.abortTransaction = true;,后面就直接回滚事务。

  

boolean setWaitedSessionsTPL(Session session, Statement cs) {

        session.tempSet.clear();

        if (cs == null) {
            return true;
        }

        if (session.abortTransaction) {
            return false;
        }

        HsqlName[] nameList = cs.getTableNamesForWrite();

        for (int i = 0; i < nameList.length; i++) {
            HsqlName name = nameList[i];

            if (name.schema == SqlInvariants.SYSTEM_SCHEMA_HSQLNAME) {
                continue;
            }

            Session holder = (Session) tableWriteLocks.get(name);

            if (holder != null && holder != session) {
                session.tempSet.add(holder);
            }

            Iterator it = tableReadLocks.get(name);

            while (it.hasNext()) {
                holder = (Session) it.next();

                if (holder != session) {
                    session.tempSet.add(holder);
                }
            }
        }

        nameList = cs.getTableNamesForRead();

        if (txModel == TransactionManager.MVLOCKS && session.isReadOnly()) {
            nameList = catalogNameList;
        }

        for (int i = 0; i < nameList.length; i++) {
            HsqlName name = nameList[i];

            if (name.schema == SqlInvariants.SYSTEM_SCHEMA_HSQLNAME) {
                continue;
            }

            Session holder = (Session) tableWriteLocks.get(name);

            if (holder != null && holder != session) {
                session.tempSet.add(holder);
            }
        }

        if (session.tempSet.isEmpty()) {
            return true;
        }

        if (checkDeadlock(session, session.tempSet)) {
            return true;
        }

        session.tempSet.clear();

        session.abortTransaction = true;

        return false;
    }

 

    boolean checkDeadlock(Session session, OrderedHashSet newWaits) {

        int size = session.waitingSessions.size();

        for (int i = 0; i < size; i++) {
            Session current = (Session) session.waitingSessions.get(i);

            if (newWaits.contains(current)) {
                return false;
            }

            if (!checkDeadlock(current, newWaits)) {
                return false;
            }
        }

        return true;
    }

 lockTablesTPL(session, cs);就是对读的表加读锁,对写的表加写锁, setWaitingSessionTPL(session); 把依赖当前session的锁的session加到当前session的等待session队列中,

  

void lockTablesTPL(Session session, Statement cs) {

        if (cs == null || session.abortTransaction) {
            return;
        }

        HsqlName[] nameList = cs.getTableNamesForWrite();

        for (int i = 0; i < nameList.length; i++) {
            HsqlName name = nameList[i];

            if (name.schema == SqlInvariants.SYSTEM_SCHEMA_HSQLNAME) {
                continue;
            }

            tableWriteLocks.put(name, session);
        }

        nameList = cs.getTableNamesForRead();

        for (int i = 0; i < nameList.length; i++) {
            HsqlName name = nameList[i];

            if (name.schema == SqlInvariants.SYSTEM_SCHEMA_HSQLNAME) {
                continue;
            }

            tableReadLocks.put(name, session);
        }
    }

加锁失败就执行

   

 if (abortTransaction) {
                rollback(false);

                sessionContext.currentStatement = null;

                return Result.newErrorResult(Error.error(ErrorCode.X_40001));
            }

   database.txManager.beginActionResume(this);设置当前session的事务执行时间和事务数

  

    public void beginActionResume(Session session) {

        session.actionTimestamp = nextChangeTimestamp();

        if (!session.isTransaction) {
            session.transactionTimestamp = session.actionTimestamp;
            session.isTransaction        = true;

            transactionCount++;
        }

        return;
    }

   cs.execute(this);执行完之后执行endAction(r); 如果执行失败就rollback,成功就commit

   

    public void endAction(Result result) {

//        tempActionHistory.add("endAction " + actionTimestamp);
        sessionData.persistentStoreCollection.clearStatementTables();

        if (result.mode == ResultConstants.ERROR) {
            sessionData.persistentStoreCollection.clearResultTables(
                actionTimestamp);
            database.txManager.rollbackAction(this);
        } else {
            sessionContext
                .diagnosticsVariables[ExpressionColumn.idx_row_count] =
                    result.mode == ResultConstants.UPDATECOUNT
                    ? Integer.valueOf(result.getUpdateCount())
                    : ValuePool.INTEGER_0;

            database.txManager.completeActions(this);
        }

//        tempActionHistory.add("endAction ends " + actionTimestamp);
    }

 先看看completeActions(this);就是释放锁,然后唤醒等待该session的锁的其他session

  

 void endActionTPL(Session session) {

        if (session.isolationLevel == SessionInterface.TX_REPEATABLE_READ
                || session.isolationLevel
                   == SessionInterface.TX_SERIALIZABLE) {
            return;
        }

        if (session.sessionContext.currentStatement == null) {

            // after java function / proc with db access
            return;
        }

        if (session.sessionContext.depth > 0) {

            // routine or trigger
            return;
        }

        HsqlName[] readLocks =
            session.sessionContext.currentStatement.getTableNamesForRead();

        if (readLocks.length == 0) {
            return;
        }

        writeLock.lock();

        try {
            unlockReadTablesTPL(session, readLocks);

            final int waitingCount = session.waitingSessions.size();

            if (waitingCount == 0) {
                return;
            }

            boolean canUnlock = false;

            // if write lock was used for read lock
            for (int i = 0; i < readLocks.length; i++) {
                if (tableWriteLocks.get(readLocks[i]) != session) {
                    canUnlock = true;

                    break;
                }
            }

            if (!canUnlock) {
                return;
            }

            canUnlock = false;

            for (int i = 0; i < waitingCount; i++) {
                Session current = (Session) session.waitingSessions.get(i);

                if (current.abortTransaction) {
                    canUnlock = true;

                    break;
                }

                Statement currentStatement =
                    current.sessionContext.currentStatement;

                if (currentStatement == null) {
                    canUnlock = true;

                    break;
                }

                if (ArrayUtil.containsAny(
                        readLocks, currentStatement.getTableNamesForWrite())) {
                    canUnlock = true;

                    break;
                }
            }

            if (!canUnlock) {
                return;
            }

            resetLocks(session);
            resetLatchesMidTransaction(session);
        } finally {
            writeLock.unlock();
        }
    }

 

我们看看rollbackAction的回滚操作,之前每个insert,update,delete操作都会有一个action加到当前session的actionlist,回滚的时候根据这个actionlist来对每个action进行回滚,还需要根据当前事务的时间戳找到需要执行操作的acion列表,

  

    public void rollbackAction(Session session) {
        rollbackPartial(session, session.actionIndex, session.actionTimestamp);
        endActionTPL(session);
    }

 

 void rollbackPartial(Session session, int start, long timestamp) {

        Object[] list  = session.rowActionList.getArray();
        int      limit = session.rowActionList.size();

        if (start == limit) {
            return;
        }

        for (int i = limit - 1; i >= start; i--) {
            RowAction action = (RowAction) list[i];

            if (action == null || action.type == RowActionBase.ACTION_NONE
                    || action.type == RowActionBase.ACTION_DELETE_FINAL) {
                continue;
            }

            Row row = action.memoryRow;

            if (row == null) {
                row = (Row) action.store.get(action.getPos(), false);
            }

            if (row == null) {
                continue;
            }

            action.rollback(session, timestamp);

            int type = action.mergeRollback(session, timestamp, row);

            action.store.rollbackRow(session, row, type, txModel);
        }

        session.rowActionList.setSize(start);
    }

 

    public void rollbackRow(Session session, Row row, int changeAction,
                            int txModel) {

        switch (changeAction) {

            case RowAction.ACTION_DELETE :
                if (txModel == TransactionManager.LOCKS) {
                    ((RowAVL) row).setNewNodes(this);
                    indexRow(session, row);
                }
                break;

            case RowAction.ACTION_INSERT :
                if (txModel == TransactionManager.LOCKS) {
                    delete(session, row);
                    remove(row.getPos());
                }
                break;

            case RowAction.ACTION_INSERT_DELETE :

                // INSERT + DELETE
                if (txModel == TransactionManager.LOCKS) {
                    remove(row.getPos());
                }
                break;
        }
    }

 mergeRollback还会对同一个行如果有插入再删除则会把两个action做合并之类的。

 

事务隔离级别在2PL的处理

   

If a table is read-only, it will not be locked by any transaction.
The READ UNCOMMITTED isolation level can be used in 2PL modes for read-only operations. It is the same as
READ COMMITTED plus read only.
The READ COMMITTED isolation level is the default. It keeps write locks on tables until commit, but releases the
read locks after each operation.
The REPEATABLE READ level is upgraded to SERIALIZABLE. These levels keep both read and write locks on
tables until commit.
It is possible to perform some critical operations at the SERIALIZABLE level, while the rest of the operations are
performed at the READ COMMITTED level.

 

 

0
0
分享到:
评论

相关推荐

    HSQLDB

    源码分析** 由于HSQLDB是开源的,开发者可以深入研究其源码,理解数据库的内部工作原理,如查询解析、执行计划生成、事务管理等,这对提升数据库相关的技术能力非常有帮助。 总之,HSQLDB作为一个轻量级、高性能...

    开源数据库软件hsqldb

    五、源码分析与工具支持 HSQldb的源码开放,开发者可以深入了解数据库的实现机制,这对于学习数据库设计和优化非常有帮助。同时,HSQldb支持多种管理工具,如SQuirreL SQL Client、DBVisualizer等,方便用户进行...

    <转>HSQLDB 安装与使用

    ### 源码分析 HSQLDB是用纯Java编写的,因此对于开发者来说,查看源码可以深入了解其工作原理。主要的类包括`org.hsqldb.Server`(服务器进程)、`org.hsqldb.jdbc.JDBCConnection`(JDBC连接)以及`org.hsqldb....

    HSQLDB的使用

    **源码分析** HSQLDB是用Java编写的,因此其源码可读性较高,对于学习数据库原理和实现有很高的价值。通过阅读源码,开发者可以深入理解SQL的执行流程、事务管理、索引构建等核心概念。 **工具支持** HSQLDB提供...

    HSQLDB用户指导学习手册

    - **事务处理问题**:探讨了在处理事务时可能遇到的问题及解决方案。 - **新特性和变更**:介绍了HSQLDB的最新特性和发展方向,帮助用户了解未来的发展趋势。 #### 四、UNIX快速入门 - **目的**:这部分专为UNIX...

    Hypersql的源码修改

    然而,通过深入Hypersql的源码,我们可以找到处理SQL语句解析和执行的部分,比如`org.hsqldb.ParserDQL`和`org.hsqldb.Statement`类。我们可以在此基础上扩展语法,允许用户在一条语句中嵌套多个操作,比如使用分号...

    spring+hibernate+hsqldb 测试

    6. **源码分析**: 为了深入了解这个项目,我们需要查看源码中的配置文件(如`applicationContext.xml`或`persistence.xml`)以了解Spring和Hibernate的具体配置,以及测试类(可能是以`*Test.java`结尾的文件)来...

    jbpm4 测试例子(项目中含hsqldb数据库)

    HSQldb是纯Java实现的关系型数据库,支持SQL标准,包括事务处理、多用户访问和并发控制。在jbpm4测试例子中,hsqldb主要用于存储流程实例、任务和其他jbpm相关数据。它的优点包括: 1. **轻量级**: 不需要额外的...

    DButils 的源代码

    在分析 DButils 和 HSQldb 的源代码时,你可以深入理解它们如何实现连接池、查询处理、事务控制以及错误处理等机制。同时,通过查看两者交互的部分,可以学习到如何将一个数据库引擎与 Java 应用程序进行集成。这样...

    HSQL源代码

    《深入理解HSQL源代码与Eclipse环境搭建》 HSQLDB,全称为HyperSQL Database,是一款...通过Eclipse这样的集成开发环境,我们可以更高效地进行源码分析、调试和测试,从而更好地利用HSQLDB这一强大的开源数据库系统。

    SSH项目源码完整下载学生管理包括权限系统

    在学生管理系统中,Spring可能用于管理数据库连接、事务控制以及服务层和DAO层对象的实例化。此外,Spring的AOP可以用于实现权限控制,例如在方法调用前进行权限校验。 Hibernate作为持久层框架,简化了数据库操作...

    springMVC+jpetstore+mysql工程源码

    【标题】"springMVC+jpetstore+mysql工程源码"是一个基于...通过阅读和分析源码,开发者不仅可以了解SpringMVC的基本用法,还能学习到如何将项目部署在Eclipse这样的IDE中,进行调试和测试,进一步提高开发效率。

    基于MyBatis框架的深度解析项目.zip

    基于MyBatis框架的深度解析项目 项目概述 本项目旨在深入解析MyBatis框架的核心功能和实现原理,涵盖从基础... JDBC事务讲解JDBC中的事务管理,包括事务边界、隔离级别等。 第3章MyBatis常用工具类 SQL类生成语

    嵌入式数据库H2开始服务

    总的来说,"嵌入式数据库H2开始服务"这个主题涵盖了H2数据库的基本概念、特点、使用方式以及源码分析,对于理解并应用H2数据库有着重要的指导作用。在实际开发中,掌握这些知识点将有助于提升开发效率和项目质量。

    JAVA内存数据库使用demo

    这种数据库因为数据读写速度快,响应时间短,适用于处理大量实时数据的场景,如高速缓存、实时分析等。本示例将围绕如何在Java中使用内存数据库进行演示。 在Java中,一个常见的内存数据库是HSQLDB(HyperSQL ...

    Spring数据库访问(HSQL)(四)

    Spring是Java领域最广泛使用的轻量级框架之一,它为开发者提供了全面的事务管理、数据访问集成以及IoC(Inversion of Control,控制反转)等功能。HSQL数据库则是一个轻量级、高性能的开源关系型数据库,常用于测试...

    翻译日程表

    源码分析和理解对于开发者来说是至关重要的,因为它能帮助他们了解软件的工作原理,以及如何自定义或扩展其功能。 “工具”标签则意味着这个话题可能与软件工具相关,可能是专门用于管理翻译项目、版本控制、任务...

    SpringBoot SpringCloud Redis

    标题和描述中提到的“SpringBoot、SpringCloud的案例源码,以及Redis安装,并附带上高可用集群”,涉及的知识点涵盖了SpringBoot、SpringCloud、Redis以及高可用集群的构建。以下是对这些技术知识点的详细介绍: ...

    java上万个实例源代码例子 链接搜集

    根据提供的标题、描述以及部分内容,我们可以总结出一系列与Java编程语言相关的知识点,特别是与实例源代码及Hibernate框架有关的部分。接下来将对这些知识点进行详细解释。 ### 一、Java实例源代码 #### 1. Java...

Global site tag (gtag.js) - Google Analytics