`
qtlkw
  • 浏览: 307113 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

Apache Camel框架之事务控制

 
阅读更多
本文简单介绍一下Apache Camel如何对route进行事务控制,首先介绍整个route只涉及到一个事务参与者的情况,然后再介绍route中涉及到多个事务参与者的情况.Camel是通过和Spring的框架集成进行事务控制的.

1,整个route只有一个事务参与者,"局部事务",这里用JMS的例子,后台的MQ为ActiveMQ,示例图如下:(图片来源于Camel in Action)


route的代码如下:
源码打印?

    public class JMSTransaction extends RouteBuilder { 
        public void configure() throws Exception { 
            TProcessor0 p0 = new TProcessor0(); 
            TProcessor1 p1 = new TProcessor1(); 
            from("jms:queue:TOOL.DEFAULT").process(p0).process(p1).to("file:d:/temp/outbox");         
        } 
    } 

Spring配置如下:
源码打印?

    <beans xmlns="http://www.springframework.org/schema/beans" 
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
           xmlns:broker="http://activemq.apache.org/schema/core" 
           xsi:schemaLocation=" 
           http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd 
           http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd 
           http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.2.0.xsd"> 
        <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring"> 
            <package> 
                com.test.camel.transaction.jms 
            </package> 
        </camelContext>     
        <bean id="jms" class="org.apache.activemq.camel.component.ActiveMQComponent"> 
            <property name="transacted" value="true"/> 
            <property name="transactionManager" ref="txManager"/>             
        </bean> 
        <bean id="txManager" class="org.springframework.jms.connection.JmsTransactionManager"> 
            <property name="connectionFactory" ref="jmsConnectionFactory"/> 
        </bean> 
        <bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> 
            <property name="brokerURL" value="tcp://localhost:61616"/>         
        </bean> 
    </beans> 

route定义的逻辑为从queue里取消息,然后进行一系列的处理(process(p0).process(p1)),<property name="transacted" value="true"/>的意思是通过这个jms进行的消息存取是有事务控制的.上面的route在process(p1)里抛出异常,txManager会进行rollback处理.(在activeMQ里,消息默认会redelivery到客户端6次,如果继续异常,消息会放到deadletter queue里(ActiveMQ.DLQ)),需要在AciveMQ的配置文件activemq.xml里配置如下:(non-persistent的queue的消息出错也转到dead letter queue)

<policyEntry queue=">">
<deadLetterStrategy>
   <sharedDeadLetterStrategy processNonPersistent="true" />
</deadLetterStrategy>

如果<property name="transacted" value="false"/>的话,消息在重发了6次后会丢失.

如果上面例子中的事务参与者是数据库的话,道理与之类似,只是配置的transaction manager不同,如:

<bean id="txManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager"/>

Camel里使用ActiveMQ JMS的例子可以参照 http://blog.csdn.net/kkdelta/article/details/7237096

2,Camel里的全局事务,一个route里有多个事务参与者,示例图如下:(图片来源于Camel in Action)


route的定义如下:
源码打印?

    public class XaTransaction extends RouteBuilder { 
        public void configure() throws Exception { 
            TProcessor1 p1 = new TProcessor1(); 
            from("jms:queue:TOOL.DEFAULT") 
            .transacted() 
            .log("+++ before database +++") 
            .bean(SQLBean.class, "toSql") 
            .to("jdbc:myDataSource") 
            .process(p1) 
            .log("+++ after database +++"); 
        } 
    } 
    public class SQLBean { 
        public String toSql(String str) { 
            //create table CamelTEST(msg varchar2(2000)); 
            StringBuilder sb = new StringBuilder(); 
            sb.append("INSERT INTO CamelTEST VALUES ('camel test')"); 
            return sb.toString(); 
        } 
    } 

route的逻辑是从queue里取消息,然后操作数据库,然后做后续其他操作(process(p1)),这里的process(p1)如果抛出异常的话,取消息和数据库操作都回滚,

如果整个route都成功完成的话,取消息和数据库操作提交.

这里用到JTA transaction manager是atomikos,相应的jar包可以从这里下载:http://download.csdn.net/detail/kkdelta/4056226

atomikos的主页 http://www.atomikos.com/Main/ProductsOverview

Spring的配置如下:
源码打印?

    <beans xmlns="http://www.springframework.org/schema/beans" 
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
           xmlns:broker="http://activemq.apache.org/schema/core" 
           xsi:schemaLocation=" 
           http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd 
           http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd 
           http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.2.0.xsd">    
        <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring"> 
            <package> 
                com.test.camel.transaction.xa 
            </package> 
        </camelContext> 
        <bean id="atomikosTransactionManager" 
              class="com.atomikos.icatch.jta.UserTransactionManager" 
              init-method="init" destroy-method="close" > 
            <!-- when close is called, should we force transactions to terminate or not? --> 
            <property name="forceShutdown" value="false"/> 
        </bean> 
        <!-- this is some atomikos setup you must do --> 
        <bean id="atomikosUserTransaction" class="com.atomikos.icatch.jta.UserTransactionImp" > 
            <property name="transactionTimeout" value="300"/> 
        </bean> 
        <!-- this is some atomikos setup you must do --> 
        <bean id="connectionFactory" 
              class="com.atomikos.jms.AtomikosConnectionFactoryBean" > 
            <property name="uniqueResourceName" value="amq1"/> 
            <property name="xaConnectionFactory" ref="jmsXaConnectionFactory"/> 
        </bean> 
        <!-- this is the Spring JtaTransactionManager which under the hood uses Atomikos --> 
        <bean id="jtaTransactionManager" 
              class="org.springframework.transaction.jta.JtaTransactionManager" > 
            <property name="transactionManager" ref="atomikosTransactionManager"/> 
            <property name="userTransaction" ref="atomikosUserTransaction"/> 
        </bean> 
        <!-- Is the ConnectionFactory to connect to the JMS broker --> 
        <!-- notice how we must use the XA connection factory --> 
        <bean id="jmsXaConnectionFactory" class="org.apache.activemq.ActiveMQXAConnectionFactory" > 
            <property name="brokerURL" value="tcp://localhost:61616"/> 
        </bean> 
        <!-- define the activemq Camel component so we can integrate with the AMQ broker below --> 
        <bean id="jms" class="org.apache.activemq.camel.component.ActiveMQComponent" > 
            <property name="transacted" value="true"/> 
            <property name="transactionManager" ref="jtaTransactionManager"/> 
        </bean> 
       <bean id="myDataSource"  
          class="com.atomikos.jdbc.AtomikosDataSourceBean"  
          init-method="init" destroy-method="close">  
           <!-- set an arbitrary but unique name for the datasource -->  
          <property name="uniqueResourceName"><value>XADBMS</value></property>  
          <property name="xaDataSourceClassName">  
             <value>oracle.jdbc.xa.client.OracleXADataSource</value>  
          </property>  
          <property name="xaProperties">  
                    <props>  
                            <prop key="user">xxx</prop>  
                            <prop key="password">xxx</prop>  
                            <prop key="URL">jdbc:oracle:thin:@147.151.240.xxx:1521:orcl</prop> 
                    </props>  
          </property>     
          <property name="poolSize" value="1"/>  
       </bean>     
    </beans>

   
<project>  
      <modelVersion>4.0.0</modelVersion>  
      <groupId>my-osgi-bundles</groupId>  
      <artifactId>examplebundle</artifactId>  
      <packaging>bundle</packaging>    <!-- (1) -->  
      <version>1.0</version>  
      <name>Example Bundle</name>  
      <dependencies>  
        <dependency>  
          <groupId>org.apache.felix</groupId>  
          <artifactId>org.osgi.core</artifactId>  
          <version>1.0.0</version>  
        </dependency>  
      </dependencies>  
      <build>  
        <plugins>  
          <plugin>    <!-- (2) START -->  
            <groupId>org.apache.felix</groupId>  
            <artifactId>maven-bundle-plugin</artifactId>  
            <extensions>true</extensions>  
            <configuration>  
              <instructions>  
                <Export-Package>com.my.company.api</Export-Package>  
                <Private-Package>com.my.company.*</Private-Package>  
                <Bundle-Activator>com.my.company.Activator</Bundle-Activator>  
              </instructions>  
            </configuration>  
          </plugin>    <!-- (2) END -->  
        </plugins>  
      </build>  
    </project>
 
分享到:
评论

相关推荐

    ApacheCamel-JDBC

    Apache Camel 是一个流行的开源集成框架,它允许开发者以声明式的方式定义路由和转换数据,使得在不同的软件组件之间实现通信变得更加简单。JDBC(Java Database Connectivity)是Java平台中的一个标准API,用于与...

    Apache Camel中文开发使用指南.zip

    Apache Camel 是一个强大的开源框架,专门用于构建企业级应用程序中的集成解决方案。它提供了一种声明式的方式,使得开发者可以轻松地定义数据路由和处理规则,从而实现系统间的通信。这个"Apache Camel 开发使用...

    apache-camel-3.7.0_数据同步_

    Apache Camel 是一个强大的开源框架,专门用于构建企业级集成解决方案。在标题“apache-camel-3.7.0_数据同步_”中提到的“数据同步”,是指利用Apache Camel实现不同系统、数据库或应用程序间的数据交换和一致性...

    Apache Camel 源码分析.rar

    Apache Camel 是一个强大的开源企业集成库,它提供了一种声明式的方式来定义路由和转换数据,使得构建复杂的分布式系统变得更加简单。Camel 使用一种名为“DSL”(Domain Specific Language)的语法规则,允许开发者...

    apache camel

    Apache Camel 是一个强大的开源集成框架,它允许开发者在各种企业应用程序之间建立灵活、可重用的数据路由和集成解决方案。这个框架的核心理念是“统一的消息模型”,它提供了多种组件,支持多种协议和数据格式,...

    apache camel 简介

    Camel 利用了Spring框架的特性,如声明式事务、控制反转配置以及与JMS、JDBC和JPA的集成,降低了复杂度,减少了XML配置的需求。通过这种方式,Camel 提高了代码的可读性和可维护性。 在实际使用中,Apache Camel ...

    Camel服务集成,服务编排操作文档

    Apache Camel 是一个强大的开源框架,专门用于构建企业级应用程序中的集成解决方案。它提供了一种声明式的方式来进行服务集成和服务编排,使得开发者可以方便地连接不同的系统、协议和API,从而构建复杂的数据流。在...

    Camel in action(camel实战)

    Apache Camel 是一个强大的 Java 框架,它使得开发者能够轻松地实现企业级集成模式。通过简洁而强大的领域特定语言(DSL),开发者可以像拼接乐高积木一样将集成逻辑无缝嵌入到应用程序中。Apache Camel 支持超过 80...

    Apache Camel 其他.rar

    Apache Camel 是一个强大的开源集成框架,它简化了企业级应用之间的数据交换和系统集成。这个框架的核心理念是“路由和消息传递”,它提供了一种声明式的方式来定义路由规则,允许开发者使用直观的DSL(领域特定语言...

    apache-camel-1.3.0.tar.gz

    Apache Camel 是一个流行的开源集成框架,它提供了一种声明式的方式来定义业务流程和路由规则,使得开发者能够构建复杂的分布式系统。这个"apache-camel-1.3.0.tar.gz"文件是Apache Camel的一个早期版本,具体为...

    camel文档

    该书全面介绍了Apache Camel这一强大而灵活的企业集成框架的核心概念、设计模式及实际应用场景。 ### 核心知识点概览 #### 第一部分:初次接触Camel(第1章至第2章) - **第1章:遇见Camel** - **简介**:介绍...

    Java_Apache Camel示例.zip

    Apache Camel 是一个流行的开源框架,主要用于构建企业级集成解决方案。它提供了一种声明式的方式,通过使用简单且直观的DSL(领域特定语言)来定义路由和消息转换规则。在这个"Java_Apache Camel示例.zip"压缩包中...

    [Camel实战].(Camel.in.Action).Claus.Ibsen&Jonathan;.Anstey.文字版

    - **事务管理**:介绍了如何在Camel中管理和控制事务,确保数据的一致性和完整性。 - **最佳实践**:分享了关于事务管理的最佳实践和技巧。 ###### 3.2 并发与可扩展性 (Concurrency and scalability) - **并发模型...

    apache-camel-2.4.0-src.zip

    Apache Camel 是一个流行的开源企业集成库,它提供了一种简单且声明式的模型来定义和执行数据路由和消息转换。在2.4.0版本中,这个框架为开发者提供了丰富的组件和功能,使得构建复杂的分布式系统变得更为容易。...

    apache-camel-1.6.3.tar.gz

    Apache Camel 是一个流行的开源集成框架,它提供了一种声明式的方式来构建企业级应用程序的路由和消息传递系统。这个"apache-camel-1.6.3.tar.gz"文件是Apache Camel的1.6.3版本的源码包,采用tar.gz格式进行压缩,...

    apache-camel-2.7.2.zip

    Apache Camel 是一个流行的开源集成框架,它提供了一种声明式的方式来定义业务流程和路由规则,使得开发者能够构建复杂的分布式系统而无需深入理解底层细节。在本文中,我们将深入探讨 Apache Camel 2.7.2 版本的...

    apache-camel-2.0-M1-src.zip

    在开发过程中,Apache Camel 提供了丰富的API和强大的错误处理机制,如处理异常、回退策略、事务控制等。此外,Camel还支持性能监控和日志记录,帮助开发者跟踪和调试路由行为。2.0版本的Camel可能引入了一些新特性...

    apache-camel-1.2.0-src.tar.gz

    Apache Camel 是一个强大的开源企业集成库,它提供了一种模型化的路由和消息处理方式,使得开发者可以使用统一的API来构建复杂的企业级应用程序。在"apache-camel-1.2.0-src.tar.gz"这个压缩包中,包含了Apache ...

    apache-camel-2.7.5.tar.gz

    在Apache Camel 2.7.5中,除了主库之外,可能还有许多依赖的JAR文件,包括Camel的各个组件和其他必要的库,如SLF4J(日志框架)、CXF(Web服务)等。 8. **开发和部署**: 开发者可以使用Maven或Gradle等构建工具...

    apache-camel-2.9.2.tar.gz

    Apache Camel 是一个流行的开源集成框架,它用于构建企业级应用程序之间的数据路由和消息传递系统。在IT领域,尤其是在Java世界中,Apache Camel 被广泛应用于构建可复用的、灵活的集成解决方案,以连接各种不同的...

Global site tag (gtag.js) - Google Analytics