分布式事务服务 DTS二
如何玩转 DTS,基本上使用 DTS 对发起方的配置要求会多一点。
添加 DTS 的依赖
NOTE: 发起方和参与方都需要添加依赖。
如果使用 SOFA Lite,只需按照样例工程里的方式添加依赖:
<dependency>
<groupId>com.alipay.sofa</groupId>
<artifactId>slite-starter-xts</artifactId>
</dependency>
如果没有使用 SOFA Lite,那么需要在 pom 配置里加上 DTS 的依赖:
<dependency>
<groupId>com.alipay.xts</groupId>
<artifactId>xts-core</artifactId>
<version>6.0.8</version>
</dependency>
<dependency>
<groupId>com.alipay.xts</groupId>
<artifactId>xts-adapter-sofa</artifactId>
<version>6.0.8</version>
</dependency>
场景介绍
- 首先我们假想这样一种场景:转账服务,从银行 A 某个账户转 100 元钱到银行 B 的某个账户,银行 A 和银行 B 可以认为是两个单独的系统,也就是两套单独的数据库。
- 我们将账户系统简化成只有账户和余额 2 个字段,并且为了适应 DTS 的两阶段设计要求,业务上又增加了一个冻结金额(冻结金额是指在一笔转账期间,在一阶段的时候使用该字段临时存储转账金额,该转账额度不能被使用,只有等这笔分布式事务全部提交成功时,才会真正的计入可用余额)。按这样的设计,用户的可用余额等于账户余额减去冻结金额。这点是理解参与者设计的关键,也是 DTS 保证最终一致的业务约束。
- 同时为了记录账户操作明细,我们设计了一张账户流水表用来记录每次账户的操作明细,所以领域对象简单设计如下:
public class Account {
/**
* 账户
*/
private String accountNo;
/**
* 余额
*/
private double amount;
/**
* 冻结金额
*/
private double freezedAmount;
public class AccountTransaction {
/**
* 事务id
*/
private String txId;
/**
* 操作账户
*/
private String accountNo;
/**
* 操作金额
*/
private double amount;
/**
* 操作类型,扣帐还是入账
*/
private String type;
A 银行参与者
我们假设需要从 A 账户扣 100 元钱,所以 A 系统提供了一个扣帐的服务,对应扣帐的一阶段接口和相应的二阶段接口如下:
/**
* A银行参与者,执行扣帐操作
* @version $Id: FirstAction.java, v 0.1 2014年9月22日 下午5:32:59 Exp $
*/
public interface FirstAction {
/**
* 一阶段方法,注意要打上xts的标注哦
*
* @param businessActionContext
* @param accountNo
* @param amount
*/
@TwoPhaseBusinessAction(name = "firstAction", commitMethod = "commit", rollbackMethod = "rollback")
public void prepare_minus(BusinessActionContext businessActionContext,String accountNo,double amount);
/**
* 二阶段的提交方法
* @param businessActionContext
* @return
*/
public boolean commit(BusinessActionContext businessActionContext);
/**
* 二阶段的回滚方法
* @param businessActionContext
* @return
*/
public boolean rollback(BusinessActionContext businessActionContext);
}
对应的一阶段扣帐实现
public void prepare_minus(final BusinessActionContext businessActionContext,
final String accountNo, final double amount) {
transactionTemplate.execute(new TransactionCallback() {
@Override
public Object doInTransaction(TransactionStatus status) {
try {
try {
//锁定账户
Account account = accountDAO.getAccount(accountNo);
if (account.getAmount() - amount < 0) {
throw new TransactionFailException("余额不足");
}
//先记一笔账户操作流水
AccountTransaction accountTransaction = new AccountTransaction();
accountTransaction.setTxId(businessActionContext.getTxId());
accountTransaction.setAccountNo(accountNo);
accountTransaction.setAmount(amount);
accountTransaction.setType("minus");
//初始状态,如果提交则更新为C状态,如果失败则删除记录
accountTransaction.setStatus("I");
accountTransactionDAO.addTransaction(accountTransaction);
//再递增冻结金额,表示这部分钱已经被冻结,不能使用
double freezedAmount = account.getFreezedAmount() + amount;
account.setFreezedAmount(freezedAmount);
accountDAO.updateFreezedAmount(account);
} catch (Exception e) {
System.out.println("一阶段异常," + e);
throw new TransactionFailException("一阶段操作失败", e);
}
return null;
}
});
}
对应的二阶段提交操作
public boolean commit(final BusinessActionContext businessActionContext) {
transactionTemplate.execute(new TransactionCallback() {
@Override
public Object doInTransaction(TransactionStatus status) {
try {
//找到账户操作流水
AccountTransaction accountTransaction = accountTransactionDAO
.findTransaction(businessActionContext.getTxId());
//事务数据被删除了
if (accountTransaction == null) {
throw new TransactionFailException("事务信息被删除");
}
//重复提交幂等保证只做一次
if (StringUtils.equalsIgnoreCase("C", accountTransaction.getStatus())) {
return true;
}
Account account = accountDAO.getAccount(accountTransaction.getAccountNo());
//扣钱
double amount = account.getAmount() - accountTransaction.getAmount();
if (amount < 0) {
throw new TransactionFailException("余额不足");
}
account.setAmount(amount);
accountDAO.updateAmount(account);
//冻结金额相应减少
account.setFreezedAmount(account.getFreezedAmount()
- accountTransaction.getAmount());
accountDAO.updateFreezedAmount(account);
//事务成功之后更新为C
accountTransactionDAO.updateTransaction(businessActionContext.getTxId(), "C");
} catch (Exception e) {
System.out.println("二阶段异常," + e);
throw new TransactionFailException("二阶段操作失败", e);
}
return null;
}
});
return false;
}
对应的二阶段回滚操作
public boolean rollback(final BusinessActionContext businessActionContext) {
transactionTemplate.execute(new TransactionCallback() {
@Override
public Object doInTransaction(TransactionStatus status) {
try {
//回滚冻结金额
AccountTransaction accountTransaction = accountTransactionDAO
.findTransaction(businessActionContext.getTxId());
if (accountTransaction == null) {
System.out.println("二阶段---空回滚成功");
return null;
}
Account account = accountDAO.getAccount(accountTransaction.getAccountNo());
account.setFreezedAmount(account.getFreezedAmount()
- accountTransaction.getAmount());
accountDAO.updateFreezedAmount(account);
//删除流水
accountTransactionDAO.deleteTransaction(businessActionContext.getTxId());
} catch (Exception e) {
System.out.println("二阶段异常," + e);
throw new TransactionFailException("二阶段操作失败", e);
}
return null;
}
});
return false;
}
B 银行参与者
我们假设需要对 B 账户入账 100 元钱,所以 B 系统提供了一个入账的服务,对应入账的一阶段接口和相应的二阶段接口基本和 A 银行参与者类似,这里不多做介绍,可以直接查看样例工程下的 xts-sample 工程代码。
发起方
前面介绍了参与者的实现细节,接下来看看发起方系统是如何协调这 2 个参与者,达到分布式事务下数据的最终一致性的。相比参与者,发起方的配置要复杂一些。
- 在发起方自己的数据库里创建 DTS 的表
- 配置 BusinessActivityControlService
BusinessActivityControlService 是 DTS 分布式事务的启动类,在 SOFA 环境中,我们可以这样使用
<!-- 分布式事务的服务,用来发起分布式事务 -->
<sofa:xts id="businessActivityControlService">
<!-- 发起方自己的数据源,建议使用zdal数据源组件,这里简单使用dbcp数据源 -->
<sofa:datasource ref="activityDataSource"/>
<!-- 如果使用zdal数据源,可以不用配置这个属性,这个dbType是用来区分目标库的类型,以方便xts设置sqlmap -->
<sofa:dbtype value="mysql"/>
</sofa:xts>
在其他环境中,我们也可以将它配置成一个普通 Bean,配置如下
<!-- 分布式事务的服务,用来发起分布式事务 -->
<bean name="businessActivityControlService" class="com.alipay.xts.client.api.impl.sofa.BusinessActivityControlServiceImplSofa">
<!-- 发起方自己的数据源,建议使用zdal数据源组件,这里简单使用dbcp数据源 -->
<property name="dataSource" ref="activityDataSource"/>
<!-- 如果使用zdal数据源,可以不用配置这个属性,这个dbType是用来区分目标库的类型,以方便xts设置sqlmap -->
<property name="dbType" value="mysql"/>
</bean>
- 配置参与者服务和拦截器。如果是在 SOFA 环境中,DTS 框架会自动拦截参与者方法,拦截器就不用配置了
<!-- 第一个参与者的代理 -->
<bean id="firstAction" class="org.springframework.aop.framework.ProxyFactoryBean">
<property name="proxyInterfaces" value="com.alipay.xts.client.sample.action.FirstAction"/>
<property name="target" ref="firstActionTarget"/>
<property name="interceptorNames">
<list>
<value>businessActionInterceptor</value>
</list>
</property>
</bean>
<!-- 第一个参与者 -->
<bean id="firstActionTarget" class="com.alipay.xts.client.sample.action.impl.FirstActionImpl">
<property name="accountTransactionDAO">
<ref bean="firstActionAccountTransactionDAO" />
</property>
<property name="accountDAO">
<ref bean="firstActionAccountDAO" />
</property>
<property name="transactionTemplate">
<ref bean="firstActionTransactionTemplate" />
</property>
</bean>
<!-- 第二个参与者的代理 -->
<bean id="secondAction" class="org.springframework.aop.framework.ProxyFactoryBean">
<property name="proxyInterfaces" value="com.alipay.xts.client.sample.action.SecondAction"/>
<property name="target" ref="secondActionTarget"/>
<property name="interceptorNames">
<list>
<value>businessActionInterceptor</value>
</list>
</property>
</bean>
<!-- 第二个参与者 -->
<bean id="secondActionTarget" class="com.alipay.xts.client.sample.action.impl.SecondActionImpl">
<property name="accountTransactionDAO">
<ref bean="secondActionAccountTransactionDAO" />
</property>
<property name="accountDAO">
<ref bean="secondActionAccountDAO" />
</property>
<property name="transactionTemplate">
<ref bean="secondActionTransactionTemplate" />
</property>
</bean>
<!-- 拦截器,在参与者调用前生效,插入参与者的action记录 -->
<bean id="businessActionInterceptor"
class="com.alipay.sofa.platform.xts.bacs.integration.BusinessActionInterceptor">
<property name="businessActivityControlService" ref="businessActivityControlService"/>
</bean>
- 发起分布式事务
启动分布式事务的入口方法
/**
* 启动一个业务活动。
*
* 为了保证业务活动的唯一性,对同样的businessType与businessId,只能有一次成功记录。
*
* 系统允许多次调用start方式启动业务活动,如果当前业务活动已经存在,再次启动业务活动不会有任何效果,也不会检查业务类型与业务号是否匹配。
*
* @param businessType 业务类型,由业务系统自定义,比如'trade_pay'代表交易支付
* @param businessId 业务号,如交易号
* @notice 事务号的格式为: businessType+"-"+businessId,总长度为128
* @return
*/
BusinessActivityId start(String businessType, String businessId, Map<String, Object> properties);
businessType + businessId 就是最终的事务号,properties 可以让发起方设置一些全局的事务上下文信息。
转账服务发起分布式事务
/**
* 执行转账操作
*
* @param from
* @param to
* @param amount
*/
public void transfer(final String from, final String to, final double amount) {
/**
* 注意:开启xts服务必须包含在发起方的本地事务模版中
*/
transactionTemplate.execute(new TransactionCallback() {
@Override
public Object doInTransaction(TransactionStatus status) {
System.out.println("开始启动xts分布式事务活动");
//启动分布式事务,第三个是分布式事务的全局上下文信息
Map<String, Object> properties = new HashMap<String, Object>();
BusinessActivityId businessActivityId = businessActivityControlService.start("pay",
businessId, properties);
System.out.println("=====启动分布式事务成功,事务号:" + businessActivityId.toStringForm()
+ "=====");
System.out.println("=====一阶段,准备从B银行执行入账操作=====");
//第二个参与者入账操作
if (secondAction.prepare_add(null, to, amount)) {
System.out.println("=====一阶段,从B银行执行入账操作成功=====");
} else {
System.out.println("=====一阶段,从B银行执行入账操作失败,准备回滚=====");
status.setRollbackOnly();
return null;
}
System.out.println("=====一阶段,准备从A银行执行扣账操作=====");
//第一个参与者扣账操作
if (firstAction.prepare_minus(null, from, amount)) {
System.out.println("=====一阶段,从A银行执行扣账操作成功=====");
} else {
System.out.println("=====一阶段,从A银行执行扣账操作失败,准备回滚=====");
status.setRollbackOnly();
}
return null;
}
});
System.out.println("二阶段----转账成功,钱已到位");
}
小结
使用 DTS 开发需要关注的就是以上内容。对于参与者来说,最关键的是业务上如何实现两阶段处理来保证最终一致性,对于发起方来说,主要是要配置 DTS 的表。
http://blog.csdn.net/qq_27384769/article/details/79303942
相关推荐
分布式事务服务 (Distributed Transaction Service, DTS) 是一个分布式事务框架,用来保障在大规模分布式环境下事务的最终一致性。DTS 从架构上分为 dts-core 、dts-schedule、 dts-server 两部分,dts-core是一个...
【钢铁厂分布式光纤(DTS)测温系统设计方案】 在钢铁厂的日常运营中,安全是至关重要的,而消防安全则是其中的基石。分布式光纤测温系统(Distributed Temperature Sensing,简称DTS)是一种利用光纤作为传感器,...
根据给出的文件信息,文件标题为《天然气水合物试采中分布式光纤测温(DTS)数据现场处理及可视化》,描述中仅提供了标签“分布式 分布式系统 分布式开发 参考文献 专业指导”,以及部分内容涉及期刊信息、引用格式和...
钢铁厂分布式光纤DTS测温系统设计方案.doc
分布式省、地DTS系统互联信息接口设计及实现的知识点: 1. DTS系统互联的目的与重要性:省、地调DTS系统间的互联是为了确保省、地联合反事故演习的顺利进行。该演习是检验省、地两级调度员协同处理电网事故能力的...
因此,构建一个分布式系统服务链追踪与监控平台(DTS平台)变得尤为必要。 2. DTS平台介绍 DTS平台是一个低损耗、应用透明的APM系统,能够在不改动应用层代码的前提下,通过在CRM系统基础框架中植入探针代码进行大...
分布式光纤测温系统(Distributed Temperature Sensing,简称DTS)是一种先进的温度监测技术,它利用光纤本身的特性作为传感器,可以实现对长距离、大范围内的连续、实时、精确的温度测量。本资料集主要涵盖了DTS的...
总的来说,"DTS.rar_dts_linux 平台" 提供了一套针对Linux的分布式时间服务解决方案,其重要性在于它能确保分布式系统中的时间一致性,这对于许多关键业务场景来说是必不可少的。通过对DTS的深入了解和正确部署,...
针对企业信息化建设过程中需要集成数据孤岛的问题,采用建设共享数据仓库的思想,利用完成 ETL 功能的...给出利用 DTS 组件集成分布式数据库系统的方案及其总体结构,以及在 B/S 模式下调度 DTS 包的部分关键实现代码。
阿里云专有云企业版数据传输服务DTS用户指南 阿里云专有云企业版数据传输服务DTS用户指南是阿里云提供的一款数据传输服务,旨在帮助用户快速、安全地将数据迁移到阿里云平台。该指南详细介绍了数据传输服务的使用...
DTS为应用程序提供了包括通信、压缩、加密、事务处理、安全性保障以及容错等在内的多项基础服务。DTS的主要作用是屏蔽底层硬件设备、操作系统平台的复杂性,使得应用程序能够专注于业务逻辑的实现。 在分布式交易...
分布式协同仿真技术在电力系统仿真培训中的应用越来越广泛,尤其是对于电网调度员培训仿真系统(DTS)来说,这一技术的发展与实践意义重大。DTS系统是一种能够模拟电网操作和故障恢复的仿真系统,对于提升调度员在...
阿里云专有云企业版 V3.5.2 数据传输服务DTS 产品简介 阿里云数据传输服务DTS是一种专门为企业用户设计的数据传输解决方案,该解决方案可以帮助企业用户快速、安全、可靠地传输大量数据。下面是阿里云数据传输服务...
阿里云专有云Enterprise版V3.5.0的数据传输服务DTS,是阿里云推出的一款高效、稳定、安全的数据迁移和同步工具。这款产品旨在帮助企业实现不同数据库之间的数据迁移,支持实时增量数据同步,从而满足企业在云计算...
DTS光纤分布式温度报警系统是一种先进的温度监测技术,其在清水营选煤厂的应用是煤炭行业自动化与安全监测技术结合的典范。该技术的应用提升了选煤厂的安全运行水平,对于煤炭行业在自动化、智能化发展方面具有示范...
阿里云专有云企业版V3.5.2数据传输服务DTS技术白皮书概述 阿里云专有云企业版V3.5.2数据传输服务DTS技术白皮书是阿里云提供的一份技术白皮书,旨在为用户提供数据传输服务的技术文档。本文档涵盖了DTS技术的概述、...
光纤分布式温度测量系统(DTS)是一种先进的在线监测技术,广泛应用于多个领域,包括水电站建设中的温度监测。张河湾抽水蓄能电站位于河北省石家庄市附近,总装机容量达到1000MW。该电站上水库工程采用沥青混凝土...
MX Player DTS 包是一款专为MX Player播放器设计的扩展包,旨在解决用户在播放包含DTS(Digital Theater System)音效的视频时遇到的无声或音质问题。DTS是一种高级的多声道音频编码格式,常用于电影和高质量的家庭...
标题“dts:基于spring的dts简单实现”表明我们要讨论的是一个与DTS(可能是Data Transaction Service数据事务服务)相关的项目,这个项目是基于Spring框架来实现的。Spring是一个广泛使用的Java企业级应用开发框架,...
阿里云专有云企业版数据传输服务DTS 用户指南 V3.5.2 阿里云专有云企业版数据传输服务DTS 用户指南 V3.5.2 是阿里云提供的一款数据传输服务,旨在帮助用户快速、安全、可靠地传输数据。该服务提供了多种数据传输...