- 浏览: 190061 次
- 性别:
- 来自: 上海
文章分类
最新评论
两种方案:
1、分布式事务 jta
2、事务补偿
3 二阶段提交
分布式事务,记得google有篇关于存储的论文专门讲这个。分布式事务要保证的100%一致性基本不可能,特别是异构数据库。我的建议是降低实时性要求,通过对账,应答的方式识别业务失败,再进行修复,这样更具可操作性。
你可以把对每个库的操作都独立开来,一个发生异常,其他都还原。涉及还原的问题就看你自己怎么解决了,有的可以用rollback,有的则需要事先备份到临时表的。
为什么要替代分布式事务?
当我们系统的数据量很大,大都需要对数据库进行分割,部署多台数据库实例,这样就避免不了某些操作需要同时修改几个数据库实例
里的数据,为了保证数据准确性和一致性,我们大都使用分布式事务来实现(非常经典的两阶段提交协议)。
分布式事务最大的优点就是简化应用开发,对于时间紧迫并且性能要求不高的系统可以大大的提高开发效率,这也是大多开发者沉醉于
其中的主要原因。但没有十全十美的,有利必有弊,虽然开发便捷了,但是也严重的损害了系统的可用性,高性能和可扩展性,尤其对于
海量数据复杂的系统体现就更明显。
系统的可用性
系统的可用性就相当于参加分布式事务的各个数据库实例的可用性之积,数据库实例越多,可用性下降的越明显;因为参加分布式事务
的所有数据库实例都可以正常工作下,这个分布式事务才算完成,如果有一个数据库实例有故障,那这个分布式事务都会失败。
高效能和可伸缩性
对于一个分布式事务总的持续时间是操作各个数据库实例的时间之和,因为在分布式事务中每个操作是顺序执行的,这样每个事务的响
应时间就会很长;还有对于一个OLTP系统,事务都很小,一般几毫秒,当涉及到分布式事务时,节点间的网络通信时间占事务总响应时
间的比例也是不容忽视的。还有由于事务时间相对于变长了,锁定的资源的时间也就变长了。从而严重影响系统的并发性,吞吐率和可
伸缩性。
事务补偿机制
事务补偿即在事务链中的任何一个正向事务操作,都必须存在一个完全符合回滚规则的可逆事务。如果是一个完整的事务链,则必须事务链中的每一个业务服务或操作都有对应的可逆服务。对于Service服务本身无状态,也不容易实现前面讨论过的通过DTC或XA机制实现的跨应用和资源的事务管理,建立跨资源的事务上下文。因此也较难以实现真正的预提交和正式提交的分离。
在这种情况下以上面例子来说,首先调用取款服务,完全调用成功并返回,数据已经持久化。然后调用异地的存款服务,如果也调用成功,则本身无任何问题。如果调用失败,则需要调用本地注册的逆向服务(本地存款服务),如果本地存款服务调用失败,则必须考虑重试,如果约定重试次数仍然不成功,则必须log到完整的不一致信息。也可以是将本地存款服务作为消息发送到消息中间件,由消息中间件接管后续操作。
在上面方式中可以看到需要手工编写大量的代码来处理以保证事务的完整性,我们可以考虑实现一个通用的事务管理器,实现事务链和事务上下文的管理。对于事务链上的任何一个服务正向和逆向操作均在事务管理和协同器上注册,由事务管理器接管所有的事务补偿和回滚操作。
基于消息的最终一致性
在这里首先要回答的是我们需要时实时一致性还是最终一致性的问题,如果需要的是最终一致性,那么BASE策略中的基于消息的最终一致性是比较好的解决方案。这种方案真正实现了两个服务的真正解耦,解耦的关键就是异步消息和消息持久化机制。
还是以上面的例子来看。对于转账操作,原有的两个服务调用变化为第一步调用本地的取款服务,第二步发送异地取款的异步消息到消息中间件。如果第二步在本地,则保证事务的完整性基本无任何问题,即本身就是本地事务的管理机制。只要两个操作都成功即可以返回客户成功。
由于解耦,我们看到客户得到成功返回的时候,如果是上面一种情况则异地卡马上就能查询账户存款增加。而第二种情况则不一定,因为本身是一种异步处理机制。消息中间件得到消息后会去对消息解析,然后调用异地银行提供的存款服务进行存款,如果服务调用失败则进行重试。
异地银行存款操作不应该长久地出现异常而无法使用,因此一旦发现异常我们可以迅速的解决,消息中间件中异常服务自然会进行重试以保证事务的最终一致性。这种方式假设问题一定可以解决,在不到万不得已的情况下本地的取款服务一般不进行可逆操作。
在本地取款到异地存款两个服务调用之间,会存在一个真空期,这段时间相关现金不在任何一个账户,而只是在一个事务的中间状态,但是客户并不关心这个,只要在约定的时间保证事务最终的一致性即可。
关于等幂操作的问题
重复调用多次产生的业务结果与调用一次产生的业务结果相同,简单点讲所有提供的业务服务,不管是正向还是逆向的业务服务,都必须要支持重试。因为服务调用失败这种异常必须考虑到,不能因为服务的多次调用而导致业务数据的累计增加或减少。
关于是否可以补偿的问题
在这里我们谈的是多个跨系统的业务服务组合成一个分布式事务,因此在对事务进行补偿的时候必须要考虑客户需要的是否一定是最终一致性。客户对中间阶段出现的不一致的承受度是如何的。
在上面的例子来看,如果采用事务补偿机制,基本可以是做到准实时的补偿,不会有太大的影响。而如果采用基于消息的最终一致性方式,则可能整个周期比较长,需要较长的时间才能给得到最终的一致性。比如周六转款,客户可能下周一才得到通知转账不成功而进行了回退,那么就必须要考虑客户是否能给忍受。
其次对于前面讨论,如果真正需要的是实时的一致性,那么即使采用事务补偿机制,也无法达到实时的一致性。即很可能在两个业务服务调用中间,客户前台业务操作对持久化的数据进行了其它额外的操作。在这种模式下,我们不得不考虑需要在数据库表增加业务状态锁的问题,即整个事务没有完整提交并成功前,第一个业务服务调用虽然持久化在数据库,但是仍然是一个中间状态,需要通过业务锁来标记,控制相关的业务操作和行为。但是在这种模式下无疑增加了整个分布式业务系统的复杂度。
消息队列-状态表方案和分布式事务的对比
对于时间紧迫或者对性能要求不高的系统,应采用分布式事务加快开发效率;对于时间需求不是很紧,对性能要求很高的系统,
应考虑使用消息队列方案。所以时间与便捷,性能与扩展是需要仔细衡量的,找好中间的平衡点;对于原来使用分布式事务,
且系统已趋于稳定,性能要求高的系统,则可以使用消息队列-状态表方案进行重构来优化性能。
两阶段提交(2PC)
阿里也在用二阶段提交
两阶段提交协议可以保证数据的强一致性,许多分布式关系型数据管理系统采用此协议来完成分布式事务。它是协调所有分布式原子事务参与者,并决定提交或取消(回滚)的分布式算法。同时也是解决一致性问题的算法。该算法能够解决很多的临时性系统故障(包括进程、网络节点、通信等故障),被广泛地使用。但是,它并不能够通过配置来解决所有的故障,在某些情况下它还需要人为的参与才能解决问题。
顾名思义,两阶段提交分为以下两个阶段:
1)Prepare Phase (准备节点)
2)Commit Phase (提交阶段)
1)Prepare Phase
在请求阶段,协调者将通知事务参与者准备提交或取消事务,然后进入表决过程。在表决过程中,参与者将告知协调者自己的决策:同意(事务参与者本地作业执行成功)或取消(本地作业执行故障)。
为了完成准准备阶段,除了commit point site外,其它的数据库节点按照以下步骤执行:
每个节点检查自己是否被其它节点所引用,如果有,就通知这些节点准备提交(进入 Prepare阶段)。
每个节点检查自己运行的事务,如果发现本地运行的事务没有修改数据的操作(只读),则跳过后面的步骤,直接返回一个read only给全局协调器。
如果事务需要修改数据,则为事务分配相应的资源用于保证修改的正常进行。
当上面的工作都成功后,给全局协调器返回准备就绪的信息,反之,则返回失败的信息。
2) Commit Phase
在该阶段,协调者将基于第一个阶段的投票结果进行决策:提交或取消。当且仅当所有的参与者同意提交事务协调者才通知所有的参与者提交事务,否则协调者将通知所有的参与者取消事务。参与者在接收到协调者发来的消息后将执行响应的操作。
提交阶段按下面的步骤进行:
全局协调器通知 commit point site 进行提交。
commit point site 提交,完成后通知全局协调器。
全局协调器通知其它节点进行提交。
其它节点各自提交本地事务,完成后释放锁和资源。
其它节点通知全局协调器提交完成。
3)结束阶段
全局协调器通知commit point site说所有节点提交完成。
commit point site数据库释放和事务相关的所有资源,然后通知全局协调器。
全局协调器释放自己持有的资源。
分布式事务结束
一般情况下,两阶段提交机制都能较好的运行,当在事务进行过程中,有参与者宕机时,重启以后,可以通过询问其他参与者或者协调者,从而知道这个事务到底提交了没有。当然,这一切的前提都是各个参与者在进行每一步操作时,都会事先写入日志。
唯一一个两阶段提交不能解决的困境是:当协调者在发出commit 消息后宕机,而唯一收到这条命令的一个参与者也宕机了,这个时候这个事务就处于一个未知的状态,没有人知道这个事务到底是提交了还是未提交,从而需要数据库管理员的介入,防止数据库进入一个不一致的状态。当然,如果有一个前提是:所有节点或者网络的异常最终都会恢复,那么这个问题就不存在了,协调者和参与者最终会重启,其他节点也最终会收到commit 的信息。这也符合CAP理论。
以下来自http://blog.csdn.net/ithomer/article/details/10859235
spring的分布式事务
分布式事务是指操作多个数据库之间的事务,spring的org.springframework.transaction.jta.JtaTransactionManager,提供了分布式事务支持。如果使用WAS的JTA支持,把它的属性改为WebSphere对应的TransactionManager。
在tomcat下,是没有分布式事务的,不过可以借助于第三方软件jotm(Java Open Transaction Manager )和AtomikosTransactionsEssentials实现,在spring中分布式事务是通过jta(jotm,atomikos)来进行实现。
1、http://jotm.objectweb.org/
2、http://www.atomikos.com/Main/TransactionsEssentials
一、使用JOTM例子
(1) Dao及实现
GenericDao接口:
[java] view plaincopyprint?
public interface GenericDao {
public int save(String ds, String sql, Object[] obj) throws Exception;
public int findRowCount(String ds, String sql);
}
GenericDaoImpl 实现:
[java] view plaincopyprint?
public class GenericDaoImpl implements GenericDao{
private JdbcTemplate jdbcTemplateA;
private JdbcTemplate jdbcTemplateB;
public void setJdbcTemplateA(JdbcTemplate jdbcTemplate) {
this.jdbcTemplateA = jdbcTemplate;
}
public void setJdbcTemplateB(JdbcTemplate jdbcTemplate) {
this.jdbcTemplateB = jdbcTemplate;
}
public int save(String ds, String sql, Object[] obj) throws Exception{
if(null == ds || "".equals(ds)) return -1;
try{
if(ds.equals("A")){
return this.jdbcTemplateA.update(sql, obj);
}else{
return this.jdbcTemplateB.update(sql, obj);
}
}catch(Exception e){
e.printStackTrace();
throw new Exception("执行" + ds + "数据库时失败!");
}
}
public int findRowCount(String ds, String sql) {
if(null == ds || "".equals(ds)) return -1;
if(ds.equals("A")){
return this.jdbcTemplateA.queryForInt(sql);
}else{
return this.jdbcTemplateB.queryForInt(sql);
}
}
}
(2) Service及实现
UserService 接口:
[java] view plaincopyprint?
public interface UserService {
public void saveUser() throws Exception;
}
UserServiceImpl 实现:
[java] view plaincopyprint?
public class UserServiceImpl implements UserService{
private GenericDao genericDao;
public void setGenericDao(GenericDao genericDao) {
this.genericDao = genericDao;
}
public void saveUser() throws Exception {
String userName = "user_" + Math.round(Math.random()*10000);
System.out.println(userName);
StringBuilder sql = new StringBuilder();
sql.append(" insert into t_user(username, gender) values(?,?); ");
Object[] objs = new Object[]{userName,"1"};
genericDao.save("A", sql.toString(), objs);
sql.delete(0, sql.length());
sql.append(" insert into t_user(name, sex) values(?,?); ");
objs = new Object[]{userName,"男的"};//值超出范围
genericDao.save("B", sql.toString(), objs);
}
}
(3) applicationContext-jotm.xml
[java] view plaincopyprint?
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:tx="http://www.springframework.org/schema/tx"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-2.5.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-2.5.xsd">
<description>springJTA</description>
<!--指定Spring配置中用到的属性文件-->
<bean id="propertyConfig"
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="locations">
<list>
<value>classpath:jdbc.properties</value>
</list>
</property>
</bean>
<!-- JOTM实例 -->
<bean id="jotm" class="org.springframework.transaction.jta.JotmFactoryBean">
<property name="defaultTimeout" value="500000"/>
</bean>
<!-- JTA事务管理器 -->
<bean id="jtaTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager">
<property name="userTransaction" ref="jotm" />
</bean>
<!-- 数据源A -->
<bean id="dataSourceA" class="org.enhydra.jdbc.pool.StandardXAPoolDataSource" destroy-method="shutdown">
<property name="dataSource">
<bean class="org.enhydra.jdbc.standard.StandardXADataSource" destroy-method="shutdown">
<property name="transactionManager" ref="jotm"/>
<property name="driverName" value="${jdbc.driver}"/>
<property name="url" value="${jdbc.url}"/>
</bean>
</property>
<property name="user" value="${jdbc.username}"/>
<property name="password" value="${jdbc.password}"/>
</bean>
<!-- 数据源B -->
<bean id="dataSourceB" class="org.enhydra.jdbc.pool.StandardXAPoolDataSource" destroy-method="shutdown">
<property name="dataSource">
<bean class="org.enhydra.jdbc.standard.StandardXADataSource" destroy-method="shutdown">
<property name="transactionManager" ref="jotm"/>
<property name="driverName" value="${jdbc2.driver}"/>
<property name="url" value="${jdbc2.url}"/>
</bean>
</property>
<property name="user" value="${jdbc2.username}"/>
<property name="password" value="${jdbc2.password}"/>
</bean>
<bean id = "jdbcTemplateA"
class = "org.springframework.jdbc.core.JdbcTemplate">
<property name = "dataSource" ref="dataSourceA"/>
</bean>
<bean id = "jdbcTemplateB"
class = "org.springframework.jdbc.core.JdbcTemplate">
<property name = "dataSource" ref="dataSourceB"/>
</bean>
<!-- 事务切面配置 -->
<aop:config>
<aop:pointcut id="pointCut"
expression="execution(* com.logcd.service..*.*(..))"/><!-- 包及其子包下的所有方法 -->
<aop:advisor pointcut-ref="pointCut" advice-ref="txAdvice"/>
<aop:advisor pointcut="execution(* *..common.service..*.*(..))" advice-ref="txAdvice"/>
</aop:config>
<!-- 通知配置 -->
<tx:advice id="txAdvice" transaction-manager="jtaTransactionManager">
<tx:attributes>
<tx:method name="delete*" rollback-for="Exception"/>
<tx:method name="save*" rollback-for="Exception"/>
<tx:method name="update*" rollback-for="Exception"/>
<tx:method name="find*" read-only="true" rollback-for="Exception"/>
</tx:attributes>
</tx:advice>
<bean id="genericDao" class="com.logcd.dao.impl.GenericDaoImpl" autowire="byName"> </bean>
<bean id="userService" class="com.logcd.service.impl.UserServiceImpl" autowire="byName"> </bean>
</beans>
(4) 测试
[java] view plaincopyprint?
public class TestUserService{
private static UserService userService;
@BeforeClass
public static void init(){
ApplicationContext app = new ClassPathXmlApplicationContext("applicationContext-jotm.xml");
userService = (UserService)app.getBean("userService");
}
@Test
public void save(){
System.out.println("begin...");
try{
userService.saveUser();
}catch(Exception e){
System.out.println(e.getMessage());
}
System.out.println("finish...");
}
}
二、关于使用atomikos实现
(1) 数据源配置
[java] view plaincopyprint?
<bean id="dataSourceA" class="com.atomikos.jdbc.SimpleDataSourceBean" init-method="init" destroy-method="close">
<property name="uniqueResourceName">
<value>${datasource.uniqueResourceName}</value>
</property>
<property name="xaDataSourceClassName">
<value>${database.driver_class}</value>
</property>
<property name="xaDataSourceProperties">
<value>URL=${database.url};user=${database.username};password=${database.password}</value>
</property>
<property name="exclusiveConnectionMode">
<value>${connection.exclusive.mode}</value>
</property>
<property name="connectionPoolSize">
<value>${connection.pool.size}</value>
</property>
<property name="connectionTimeout">
<value>${connection.timeout}</value>
</property>
<property name="validatingQuery">
<value>SELECT 1</value>
</property>
</bean>
(2)、事务配置
[java] view plaincopyprint?
<bean id="atomikosTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager"
init-method="init" destroy-method="close">
<property name="forceShutdown" value="true"/>
</bean>
<bean id="atomikosUserTransaction" class="com.atomikos.icatch.jta.UserTransactionImp">
<property name="transactionTimeout" value="${transaction.timeout}"/>
</bean>
<!-- JTA事务管理器 -->
<bean id="springTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager">
<property name="transactionManager" ref="atomikosTransactionManager"/>
<property name="userTransaction" ref="atomikosUserTransaction"/>
</bean>
<!-- 事务切面配置 -->
<aop:config>
<aop:pointcut id="serviceOperation" expression="execution(* *..service*..*(..))"/>
<aop:advisor pointcut-ref="serviceOperation" advice-ref="txAdvice"/>
</aop:config>
<!-- 通知配置 -->
<tx:advice id="txAdvice" transaction-manager="springTransactionManager">
<tx:attributes>
<tx:method name="*" rollback-for="Exception"/>
</tx:attributes>
</tx:advice>
1、分布式事务 jta
2、事务补偿
3 二阶段提交
分布式事务,记得google有篇关于存储的论文专门讲这个。分布式事务要保证的100%一致性基本不可能,特别是异构数据库。我的建议是降低实时性要求,通过对账,应答的方式识别业务失败,再进行修复,这样更具可操作性。
你可以把对每个库的操作都独立开来,一个发生异常,其他都还原。涉及还原的问题就看你自己怎么解决了,有的可以用rollback,有的则需要事先备份到临时表的。
为什么要替代分布式事务?
当我们系统的数据量很大,大都需要对数据库进行分割,部署多台数据库实例,这样就避免不了某些操作需要同时修改几个数据库实例
里的数据,为了保证数据准确性和一致性,我们大都使用分布式事务来实现(非常经典的两阶段提交协议)。
分布式事务最大的优点就是简化应用开发,对于时间紧迫并且性能要求不高的系统可以大大的提高开发效率,这也是大多开发者沉醉于
其中的主要原因。但没有十全十美的,有利必有弊,虽然开发便捷了,但是也严重的损害了系统的可用性,高性能和可扩展性,尤其对于
海量数据复杂的系统体现就更明显。
系统的可用性
系统的可用性就相当于参加分布式事务的各个数据库实例的可用性之积,数据库实例越多,可用性下降的越明显;因为参加分布式事务
的所有数据库实例都可以正常工作下,这个分布式事务才算完成,如果有一个数据库实例有故障,那这个分布式事务都会失败。
高效能和可伸缩性
对于一个分布式事务总的持续时间是操作各个数据库实例的时间之和,因为在分布式事务中每个操作是顺序执行的,这样每个事务的响
应时间就会很长;还有对于一个OLTP系统,事务都很小,一般几毫秒,当涉及到分布式事务时,节点间的网络通信时间占事务总响应时
间的比例也是不容忽视的。还有由于事务时间相对于变长了,锁定的资源的时间也就变长了。从而严重影响系统的并发性,吞吐率和可
伸缩性。
事务补偿机制
事务补偿即在事务链中的任何一个正向事务操作,都必须存在一个完全符合回滚规则的可逆事务。如果是一个完整的事务链,则必须事务链中的每一个业务服务或操作都有对应的可逆服务。对于Service服务本身无状态,也不容易实现前面讨论过的通过DTC或XA机制实现的跨应用和资源的事务管理,建立跨资源的事务上下文。因此也较难以实现真正的预提交和正式提交的分离。
在这种情况下以上面例子来说,首先调用取款服务,完全调用成功并返回,数据已经持久化。然后调用异地的存款服务,如果也调用成功,则本身无任何问题。如果调用失败,则需要调用本地注册的逆向服务(本地存款服务),如果本地存款服务调用失败,则必须考虑重试,如果约定重试次数仍然不成功,则必须log到完整的不一致信息。也可以是将本地存款服务作为消息发送到消息中间件,由消息中间件接管后续操作。
在上面方式中可以看到需要手工编写大量的代码来处理以保证事务的完整性,我们可以考虑实现一个通用的事务管理器,实现事务链和事务上下文的管理。对于事务链上的任何一个服务正向和逆向操作均在事务管理和协同器上注册,由事务管理器接管所有的事务补偿和回滚操作。
基于消息的最终一致性
在这里首先要回答的是我们需要时实时一致性还是最终一致性的问题,如果需要的是最终一致性,那么BASE策略中的基于消息的最终一致性是比较好的解决方案。这种方案真正实现了两个服务的真正解耦,解耦的关键就是异步消息和消息持久化机制。
还是以上面的例子来看。对于转账操作,原有的两个服务调用变化为第一步调用本地的取款服务,第二步发送异地取款的异步消息到消息中间件。如果第二步在本地,则保证事务的完整性基本无任何问题,即本身就是本地事务的管理机制。只要两个操作都成功即可以返回客户成功。
由于解耦,我们看到客户得到成功返回的时候,如果是上面一种情况则异地卡马上就能查询账户存款增加。而第二种情况则不一定,因为本身是一种异步处理机制。消息中间件得到消息后会去对消息解析,然后调用异地银行提供的存款服务进行存款,如果服务调用失败则进行重试。
异地银行存款操作不应该长久地出现异常而无法使用,因此一旦发现异常我们可以迅速的解决,消息中间件中异常服务自然会进行重试以保证事务的最终一致性。这种方式假设问题一定可以解决,在不到万不得已的情况下本地的取款服务一般不进行可逆操作。
在本地取款到异地存款两个服务调用之间,会存在一个真空期,这段时间相关现金不在任何一个账户,而只是在一个事务的中间状态,但是客户并不关心这个,只要在约定的时间保证事务最终的一致性即可。
关于等幂操作的问题
重复调用多次产生的业务结果与调用一次产生的业务结果相同,简单点讲所有提供的业务服务,不管是正向还是逆向的业务服务,都必须要支持重试。因为服务调用失败这种异常必须考虑到,不能因为服务的多次调用而导致业务数据的累计增加或减少。
关于是否可以补偿的问题
在这里我们谈的是多个跨系统的业务服务组合成一个分布式事务,因此在对事务进行补偿的时候必须要考虑客户需要的是否一定是最终一致性。客户对中间阶段出现的不一致的承受度是如何的。
在上面的例子来看,如果采用事务补偿机制,基本可以是做到准实时的补偿,不会有太大的影响。而如果采用基于消息的最终一致性方式,则可能整个周期比较长,需要较长的时间才能给得到最终的一致性。比如周六转款,客户可能下周一才得到通知转账不成功而进行了回退,那么就必须要考虑客户是否能给忍受。
其次对于前面讨论,如果真正需要的是实时的一致性,那么即使采用事务补偿机制,也无法达到实时的一致性。即很可能在两个业务服务调用中间,客户前台业务操作对持久化的数据进行了其它额外的操作。在这种模式下,我们不得不考虑需要在数据库表增加业务状态锁的问题,即整个事务没有完整提交并成功前,第一个业务服务调用虽然持久化在数据库,但是仍然是一个中间状态,需要通过业务锁来标记,控制相关的业务操作和行为。但是在这种模式下无疑增加了整个分布式业务系统的复杂度。
消息队列-状态表方案和分布式事务的对比
对于时间紧迫或者对性能要求不高的系统,应采用分布式事务加快开发效率;对于时间需求不是很紧,对性能要求很高的系统,
应考虑使用消息队列方案。所以时间与便捷,性能与扩展是需要仔细衡量的,找好中间的平衡点;对于原来使用分布式事务,
且系统已趋于稳定,性能要求高的系统,则可以使用消息队列-状态表方案进行重构来优化性能。
两阶段提交(2PC)
阿里也在用二阶段提交
两阶段提交协议可以保证数据的强一致性,许多分布式关系型数据管理系统采用此协议来完成分布式事务。它是协调所有分布式原子事务参与者,并决定提交或取消(回滚)的分布式算法。同时也是解决一致性问题的算法。该算法能够解决很多的临时性系统故障(包括进程、网络节点、通信等故障),被广泛地使用。但是,它并不能够通过配置来解决所有的故障,在某些情况下它还需要人为的参与才能解决问题。
顾名思义,两阶段提交分为以下两个阶段:
1)Prepare Phase (准备节点)
2)Commit Phase (提交阶段)
1)Prepare Phase
在请求阶段,协调者将通知事务参与者准备提交或取消事务,然后进入表决过程。在表决过程中,参与者将告知协调者自己的决策:同意(事务参与者本地作业执行成功)或取消(本地作业执行故障)。
为了完成准准备阶段,除了commit point site外,其它的数据库节点按照以下步骤执行:
每个节点检查自己是否被其它节点所引用,如果有,就通知这些节点准备提交(进入 Prepare阶段)。
每个节点检查自己运行的事务,如果发现本地运行的事务没有修改数据的操作(只读),则跳过后面的步骤,直接返回一个read only给全局协调器。
如果事务需要修改数据,则为事务分配相应的资源用于保证修改的正常进行。
当上面的工作都成功后,给全局协调器返回准备就绪的信息,反之,则返回失败的信息。
2) Commit Phase
在该阶段,协调者将基于第一个阶段的投票结果进行决策:提交或取消。当且仅当所有的参与者同意提交事务协调者才通知所有的参与者提交事务,否则协调者将通知所有的参与者取消事务。参与者在接收到协调者发来的消息后将执行响应的操作。
提交阶段按下面的步骤进行:
全局协调器通知 commit point site 进行提交。
commit point site 提交,完成后通知全局协调器。
全局协调器通知其它节点进行提交。
其它节点各自提交本地事务,完成后释放锁和资源。
其它节点通知全局协调器提交完成。
3)结束阶段
全局协调器通知commit point site说所有节点提交完成。
commit point site数据库释放和事务相关的所有资源,然后通知全局协调器。
全局协调器释放自己持有的资源。
分布式事务结束
一般情况下,两阶段提交机制都能较好的运行,当在事务进行过程中,有参与者宕机时,重启以后,可以通过询问其他参与者或者协调者,从而知道这个事务到底提交了没有。当然,这一切的前提都是各个参与者在进行每一步操作时,都会事先写入日志。
唯一一个两阶段提交不能解决的困境是:当协调者在发出commit 消息后宕机,而唯一收到这条命令的一个参与者也宕机了,这个时候这个事务就处于一个未知的状态,没有人知道这个事务到底是提交了还是未提交,从而需要数据库管理员的介入,防止数据库进入一个不一致的状态。当然,如果有一个前提是:所有节点或者网络的异常最终都会恢复,那么这个问题就不存在了,协调者和参与者最终会重启,其他节点也最终会收到commit 的信息。这也符合CAP理论。
以下来自http://blog.csdn.net/ithomer/article/details/10859235
spring的分布式事务
分布式事务是指操作多个数据库之间的事务,spring的org.springframework.transaction.jta.JtaTransactionManager,提供了分布式事务支持。如果使用WAS的JTA支持,把它的属性改为WebSphere对应的TransactionManager。
在tomcat下,是没有分布式事务的,不过可以借助于第三方软件jotm(Java Open Transaction Manager )和AtomikosTransactionsEssentials实现,在spring中分布式事务是通过jta(jotm,atomikos)来进行实现。
1、http://jotm.objectweb.org/
2、http://www.atomikos.com/Main/TransactionsEssentials
一、使用JOTM例子
(1) Dao及实现
GenericDao接口:
[java] view plaincopyprint?
public interface GenericDao {
public int save(String ds, String sql, Object[] obj) throws Exception;
public int findRowCount(String ds, String sql);
}
GenericDaoImpl 实现:
[java] view plaincopyprint?
public class GenericDaoImpl implements GenericDao{
private JdbcTemplate jdbcTemplateA;
private JdbcTemplate jdbcTemplateB;
public void setJdbcTemplateA(JdbcTemplate jdbcTemplate) {
this.jdbcTemplateA = jdbcTemplate;
}
public void setJdbcTemplateB(JdbcTemplate jdbcTemplate) {
this.jdbcTemplateB = jdbcTemplate;
}
public int save(String ds, String sql, Object[] obj) throws Exception{
if(null == ds || "".equals(ds)) return -1;
try{
if(ds.equals("A")){
return this.jdbcTemplateA.update(sql, obj);
}else{
return this.jdbcTemplateB.update(sql, obj);
}
}catch(Exception e){
e.printStackTrace();
throw new Exception("执行" + ds + "数据库时失败!");
}
}
public int findRowCount(String ds, String sql) {
if(null == ds || "".equals(ds)) return -1;
if(ds.equals("A")){
return this.jdbcTemplateA.queryForInt(sql);
}else{
return this.jdbcTemplateB.queryForInt(sql);
}
}
}
(2) Service及实现
UserService 接口:
[java] view plaincopyprint?
public interface UserService {
public void saveUser() throws Exception;
}
UserServiceImpl 实现:
[java] view plaincopyprint?
public class UserServiceImpl implements UserService{
private GenericDao genericDao;
public void setGenericDao(GenericDao genericDao) {
this.genericDao = genericDao;
}
public void saveUser() throws Exception {
String userName = "user_" + Math.round(Math.random()*10000);
System.out.println(userName);
StringBuilder sql = new StringBuilder();
sql.append(" insert into t_user(username, gender) values(?,?); ");
Object[] objs = new Object[]{userName,"1"};
genericDao.save("A", sql.toString(), objs);
sql.delete(0, sql.length());
sql.append(" insert into t_user(name, sex) values(?,?); ");
objs = new Object[]{userName,"男的"};//值超出范围
genericDao.save("B", sql.toString(), objs);
}
}
(3) applicationContext-jotm.xml
[java] view plaincopyprint?
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:tx="http://www.springframework.org/schema/tx"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-2.5.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-2.5.xsd">
<description>springJTA</description>
<!--指定Spring配置中用到的属性文件-->
<bean id="propertyConfig"
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="locations">
<list>
<value>classpath:jdbc.properties</value>
</list>
</property>
</bean>
<!-- JOTM实例 -->
<bean id="jotm" class="org.springframework.transaction.jta.JotmFactoryBean">
<property name="defaultTimeout" value="500000"/>
</bean>
<!-- JTA事务管理器 -->
<bean id="jtaTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager">
<property name="userTransaction" ref="jotm" />
</bean>
<!-- 数据源A -->
<bean id="dataSourceA" class="org.enhydra.jdbc.pool.StandardXAPoolDataSource" destroy-method="shutdown">
<property name="dataSource">
<bean class="org.enhydra.jdbc.standard.StandardXADataSource" destroy-method="shutdown">
<property name="transactionManager" ref="jotm"/>
<property name="driverName" value="${jdbc.driver}"/>
<property name="url" value="${jdbc.url}"/>
</bean>
</property>
<property name="user" value="${jdbc.username}"/>
<property name="password" value="${jdbc.password}"/>
</bean>
<!-- 数据源B -->
<bean id="dataSourceB" class="org.enhydra.jdbc.pool.StandardXAPoolDataSource" destroy-method="shutdown">
<property name="dataSource">
<bean class="org.enhydra.jdbc.standard.StandardXADataSource" destroy-method="shutdown">
<property name="transactionManager" ref="jotm"/>
<property name="driverName" value="${jdbc2.driver}"/>
<property name="url" value="${jdbc2.url}"/>
</bean>
</property>
<property name="user" value="${jdbc2.username}"/>
<property name="password" value="${jdbc2.password}"/>
</bean>
<bean id = "jdbcTemplateA"
class = "org.springframework.jdbc.core.JdbcTemplate">
<property name = "dataSource" ref="dataSourceA"/>
</bean>
<bean id = "jdbcTemplateB"
class = "org.springframework.jdbc.core.JdbcTemplate">
<property name = "dataSource" ref="dataSourceB"/>
</bean>
<!-- 事务切面配置 -->
<aop:config>
<aop:pointcut id="pointCut"
expression="execution(* com.logcd.service..*.*(..))"/><!-- 包及其子包下的所有方法 -->
<aop:advisor pointcut-ref="pointCut" advice-ref="txAdvice"/>
<aop:advisor pointcut="execution(* *..common.service..*.*(..))" advice-ref="txAdvice"/>
</aop:config>
<!-- 通知配置 -->
<tx:advice id="txAdvice" transaction-manager="jtaTransactionManager">
<tx:attributes>
<tx:method name="delete*" rollback-for="Exception"/>
<tx:method name="save*" rollback-for="Exception"/>
<tx:method name="update*" rollback-for="Exception"/>
<tx:method name="find*" read-only="true" rollback-for="Exception"/>
</tx:attributes>
</tx:advice>
<bean id="genericDao" class="com.logcd.dao.impl.GenericDaoImpl" autowire="byName"> </bean>
<bean id="userService" class="com.logcd.service.impl.UserServiceImpl" autowire="byName"> </bean>
</beans>
(4) 测试
[java] view plaincopyprint?
public class TestUserService{
private static UserService userService;
@BeforeClass
public static void init(){
ApplicationContext app = new ClassPathXmlApplicationContext("applicationContext-jotm.xml");
userService = (UserService)app.getBean("userService");
}
@Test
public void save(){
System.out.println("begin...");
try{
userService.saveUser();
}catch(Exception e){
System.out.println(e.getMessage());
}
System.out.println("finish...");
}
}
二、关于使用atomikos实现
(1) 数据源配置
[java] view plaincopyprint?
<bean id="dataSourceA" class="com.atomikos.jdbc.SimpleDataSourceBean" init-method="init" destroy-method="close">
<property name="uniqueResourceName">
<value>${datasource.uniqueResourceName}</value>
</property>
<property name="xaDataSourceClassName">
<value>${database.driver_class}</value>
</property>
<property name="xaDataSourceProperties">
<value>URL=${database.url};user=${database.username};password=${database.password}</value>
</property>
<property name="exclusiveConnectionMode">
<value>${connection.exclusive.mode}</value>
</property>
<property name="connectionPoolSize">
<value>${connection.pool.size}</value>
</property>
<property name="connectionTimeout">
<value>${connection.timeout}</value>
</property>
<property name="validatingQuery">
<value>SELECT 1</value>
</property>
</bean>
(2)、事务配置
[java] view plaincopyprint?
<bean id="atomikosTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager"
init-method="init" destroy-method="close">
<property name="forceShutdown" value="true"/>
</bean>
<bean id="atomikosUserTransaction" class="com.atomikos.icatch.jta.UserTransactionImp">
<property name="transactionTimeout" value="${transaction.timeout}"/>
</bean>
<!-- JTA事务管理器 -->
<bean id="springTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager">
<property name="transactionManager" ref="atomikosTransactionManager"/>
<property name="userTransaction" ref="atomikosUserTransaction"/>
</bean>
<!-- 事务切面配置 -->
<aop:config>
<aop:pointcut id="serviceOperation" expression="execution(* *..service*..*(..))"/>
<aop:advisor pointcut-ref="serviceOperation" advice-ref="txAdvice"/>
</aop:config>
<!-- 通知配置 -->
<tx:advice id="txAdvice" transaction-manager="springTransactionManager">
<tx:attributes>
<tx:method name="*" rollback-for="Exception"/>
</tx:attributes>
</tx:advice>
发表评论
-
Tomcat的四种基于HTTP协议的Connector性能比较
2017-11-28 10:39 538今天在osc上看到对Tomcat的四种基于HTTP协议的Con ... -
oracle 查看执行计划的方式
2016-12-20 18:16 388一、通过PL/SQL Dev工具 1、直接File- ... -
redis 客户端 jedis
2016-11-09 15:36 509 -
高效序列化工具kryo
2016-11-09 15:29 554 -
Jetty项目简介
2016-11-07 11:28 447jetty是一个开源、基于标准、全功能实现的Java服务器。它 ... -
windows7 64位下git和tortoisegit的安装和使用
2016-09-08 11:35 1560git https://github.com/git-for- ... -
Quartz 表达式
2016-08-26 15:13 383Quartz中时间表达式的设 ... -
org.apache.commons.dbutils
2016-08-26 11:20 362 -
org.quartz
2016-08-26 10:16 532 -
pl/sql 乱码解决
2016-08-25 16:25 332select userenv('language') from ... -
jedis
2016-08-24 18:08 458 -
ActiveMQ的集群多种部署方式
2016-08-15 16:56 679ActiveMQ的多种部署方式 ... -
使用MySQL Proxy解决MySQL主从同步延迟
2016-08-15 16:26 540使用MySQL Proxy解决MySQL主从同步延迟 ... -
待查看
2016-08-02 09:41 4061tair 2 tddl 3hsf 4 分库分表 pmd ... -
redis 原理
2016-07-10 14:50 8371 什么是redis redis是一个key-value存储 ... -
分库分表
2016-07-03 12:30 538第1章 引言 随着互联网 ... -
Notify、MetaQ、Kafka、ActiveMQ
2016-07-03 12:15 7911 Notify Notify是淘宝自主研发的一套消息服务引 ... -
Reactor、Disruptor
2016-04-27 12:55 1079Reactor 主要用于帮助开发者创建基于JVM的异步应用程序 ... -
mybatis 帮助文档
2016-04-22 11:01 509http://www.mybatis.org/mybatis- ... -
Zabbix 监控
2016-04-11 09:54 428
相关推荐
【分布式数据库事务分类策略研究】 在现代信息技术领域,随着数据量的不断增长,传统的集中式数据库系统已经无法满足高并发、大数据量的处理需求。分布式数据库作为一种有效的解决方案,通过将数据分散存储在多个...
例如,传统的两阶段提交协议(2PC)在分布式事务数据库中可能需要进行优化以适应分布式环境,以确保跨存储节点的事务一致性。 高可用性对于银行等金融行业的信息系统来说至关重要,它要求分布式事务数据库能够在面对...
分布式事务处理技术,如两阶段提交(2PC)和三阶段提交协议,用于确保跨多个节点的事务处理既安全又可靠。Paxos和RAFT等共识算法用来保证分布式系统中的节点一致性。此外,分布式存储技术是分布式数据库的另一个关键...
基于移动代理的分布式异构数据库管理系统可以有效降低网络通讯量,支持更大型分布式数据库的大量事务处理,直接支持更动态的分布式数据库事务处理,使分布式数据库的集成、变动更有弹性,同时也能直接支持移动平台...
通过运行和研究Demo,开发者可以快速上手,理解如何配置数据库(如“在config中配置数据库的名称”),以及如何调用统一的方法来执行跨数据库的操作。 标签中的“OracleHelper”、“SqlHelper”和“AccessHelper”...
4. **跨平台兼容性**:随着多云环境的普及,数据库产品需要具备更好的跨平台兼容能力。 #### 六、案例分析与最佳实践 报告还详细分析了几种典型应用场景下的数据库选型和优化策略,包括但不限于电子商务、金融交易...
分布式数据库技术的研究涵盖了数据分布策略、事务处理、查询优化、网络通信协议以及容错和恢复等多个方面。它需要解决的主要挑战包括数据的一致性、可用性、性能和网络延迟等问题。通过采用分布式事务处理模型,如两...
分布式数据库的核心特征之一就是分布式事务处理能力,它确保跨多个节点的事务能够保持原子性、一致性、隔离性和持久性(ACID特性)。分布式事务通常通过一系列分布式协议来实现,其中两阶段提交(2PC)和三阶段提交...
### 嵌入式数据库发展状况研究 #### 引言 随着信息技术的飞速发展,尤其是便携式计算设备及无线通信技术的进步,使得数据管理技术必须适应这一变化。嵌入式数据库作为一种新兴的技术,因其独特的优势而在诸多领域...
在与Oracle数据库交互时,为了确保数据的一致性和完整性,我们可以使用分布式事务(Distributed Transaction)机制,具体表现为XA事务。XA事务是一种两阶段提交(Two-Phase Commit, 2PC)协议,广泛应用于多资源协调...
### OTL跨平台数据库编程与应用 #### 一、引言 OTL(Oracle, ODBC and DB2-CLI Template Library)是一个强大的跨平台数据库编程工具包,它以C++模板库的形式出现,旨在简化数据库应用程序的开发过程,并提高其...
在分布式数据库中,事务需要跨多个节点执行,这涉及到分布式事务协议,如两阶段提交(2PC)、三阶段提交(3PC)以及更现代的Paxos和Raft算法。 4. **负载均衡**:为了优化性能和资源利用,分布式数据库需要智能地...
3. 数据库系统阶段:数据库管理系统(DBMS)的出现,如关系数据库,使得数据管理规范化,支持数据独立性和事务处理,提高了数据的一致性和完整性。 4. 新一代数据库阶段:随着互联网和大数据的崛起,分布式数据库、...
文档的标题为“技能大赛分布式数据库多事务调控模型构建”,而标签包含了“分布式”、“分布式系统”、“分布式开发”和“专业指导”等内容。以下是对这些知识点的详细说明: 分布式数据库是数据库技术和分布式计算...
XML与数据库数据的交互技术是现代信息系统中不可或缺的一部分,尤其是在需要跨平台、跨系统进行数据交换的场景下。通过对XML与关系数据库之间转换技术的研究,我们可以更好地利用这两种技术的优势,实现高效、安全的...
总的来说,这个研究生招生系统数据库课程设计涵盖了软件工程的多个重要方面:后端开发、数据库设计、存储过程编写、GUI编程和事务管理。通过这样的项目,开发者不仅可以深入理解数据库原理,还能提升实际问题解决...
尽管分布式数据库在金融领域展现出了巨大的潜力,但仍然面临挑战,如复杂的数据迁移、运维难度增加、跨节点的事务一致性问题等。未来,随着技术的不断成熟,分布式数据库将在智能化运维、自动化管理、更强的数据处理...
### 基于ArcSDE的空间数据库技术研究 #### 空间数据库技术的重要性与应用场景 随着信息技术的发展,地理信息系统(GIS)的应用越来越广泛。在众多GIS应用中,空间数据库技术发挥着至关重要的作用。空间数据库能够...
- **Jim Gray (1944-2007)**:在数据库研究领域做出了杰出贡献,特别是在事务处理、分布式数据库和大规模数据处理方面的工作,他也因此获得了1998年的图灵奖。 这些杰出人物及其贡献不仅推动了数据库技术的进步,也...