`
wiselyman
  • 浏览: 2094620 次
  • 性别: Icon_minigender_1
  • 来自: 合肥
博客专栏
Group-logo
点睛Spring4.1
浏览量:82385
74ae1471-94c5-3ae2-b227-779326b57435
点睛Spring MVC4...
浏览量:130826
社区版块
存档分类
最新评论

spring integration同步数据库数据

阅读更多

 需求为:当客户已有系统的数据被同步到我方数据库后,若再有新数据,只同步新数据到我方数据库。

解决:因为客户的业务表是不能变动的,我方在客户数据库中新建一状态表,记录哪些数据被更新过。

当客户业务表有新数据插入时,用触发器将新数据id插入到状态表。

 

为方便实例:业务表pp,状态表status

结构为:

pp:

CREATE TABLE `pp` (
  `name` varchar(255) default NULL,
  `address` varchar(255) default NULL,
  `id` int(11) NOT NULL auto_increment,
  PRIMARY KEY  (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=9 DEFAULT CHARSET=utf8;

status:

CREATE TABLE `status` (
  `id` int(11) NOT NULL auto_increment,
  `status` varchar(255) default 'new',
  `ppid` int(11) NOT NULL,
  PRIMARY KEY  (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=12 DEFAULT CHARSET=utf8;

触发器:

DROP TRIGGER if EXISTS mytrigger
CREATE TRIGGER mytrigger after INSERT on pp
for EACH ROW
BEGIN
 INSERT into `status`(ppid) values(new.id);
END;

 

核心配置:jdbc-inbound-context.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" 
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
       xmlns:context="http://www.springframework.org/schema/context" 
       xmlns:int="http://www.springframework.org/schema/integration" 
       xmlns:int-jdbc="http://www.springframework.org/schema/integration/jdbc"    
       xmlns:int-jms="http://www.springframework.org/schema/integration/jms" 
       xmlns:jdbc="http://www.springframework.org/schema/jdbc" 
       xsi:schemaLocation="http://www.springframework.org/schema/beans 
    http://www.springframework.org/schema/beans/spring-beans-3.0.xsd 
    http://www.springframework.org/schema/context 
    http://www.springframework.org/schema/context/spring-context-3.0.xsd 
    http://www.springframework.org/schema/integration 
    http://www.springframework.org/schema/integration/spring-integration-2.0.xsd 
    http://www.springframework.org/schema/integration/jdbc 
    http://www.springframework.org/schema/integration/jdbc/spring-integration-jdbc-2.0.xsd 
    http://www.springframework.org/schema/jdbc 
    http://www.springframework.org/schema/jdbc/spring-jdbc-3.0.xsd
     http://www.springframework.org/schema/integration/jms 
    http://www.springframework.org/schema/integration/jms/spring-integration-jms-2.0.xsd">
    <context:component-scan base-package="com.wisely.inbound"/>
     
	<int:channel id="target"/>
	
	<int-jdbc:inbound-channel-adapter channel="target" 
					data-source="dataSource"
					query="select p.id as ppid,p.name as ppname from pp p,status s where p.id=s.ppid and s.status='new'"
					update="update status as st set st.status='old' where ppid in (:ppid)"
									   >
		<!-- 每隔多少毫秒去抓取 -->
		<int:poller fixed-rate="5000" >
			<int:transactional/>
		</int:poller>
		<!--  指定时刻抓取
		<int:poller max-messages-per-poll="1">
			<int:transactional/>
			<int:cron-trigger expression="0 0 3 * * ?"/>
		</int:poller>
		-->
	</int-jdbc:inbound-channel-adapter>
	<int:service-activator input-channel="target" ref="jdbcMessageHandler"/>	 
	 <context:property-placeholder location="classpath*:META-INF/spring/*.properties"/>
	 <bean class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close" id="dataSource">
        <property name="driverClassName" value="${database.driverClassName}"/>
        <property name="url" value="${database.url}"/>
        <property name="username" value="${database.username}"/>
        <property name="password" value="${database.password}"/>
    </bean>   
    <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
    	<property name="dataSource" ref="dataSource"/>
    </bean>    
   </beans>

 

JdbcMessageHandler:

 

package com.wisely.inbound.jdbc;

import java.util.List;
import java.util.Map;

import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.stereotype.Component;

@Component
public class JdbcMessageHandler {
	@ServiceActivator
	public void handleJdbcMessage(List<Map<String ,Object>> message){
		for(Map<String,Object> resultMap:message){
			System.out.println("组:");
			for(String column:resultMap.keySet()){
				System.out.println("字段:"+column+" 值:"+resultMap.get(column));
			}
		}
	}
}

 

测试类:

package com.wisely.inbound.jdbc;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class JdbcInbound {

	/**
	 * @param args
	 */
	public static void main(String[] args) {
		  ApplicationContext context = 
	                new ClassPathXmlApplicationContext("/META-INF/spring/jdbc-inbound-context.xml");
	}

}

 

 

若将channel改为jms的通道。配置文件做以下修改:

 

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" 
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
       xmlns:context="http://www.springframework.org/schema/context" 
       xmlns:int="http://www.springframework.org/schema/integration" 
       xmlns:int-jdbc="http://www.springframework.org/schema/integration/jdbc"    
       xmlns:int-jms="http://www.springframework.org/schema/integration/jms" 
       xmlns:jdbc="http://www.springframework.org/schema/jdbc" 
       xsi:schemaLocation="http://www.springframework.org/schema/beans 
    http://www.springframework.org/schema/beans/spring-beans-3.0.xsd 
    http://www.springframework.org/schema/context 
    http://www.springframework.org/schema/context/spring-context-3.0.xsd 
    http://www.springframework.org/schema/integration 
    http://www.springframework.org/schema/integration/spring-integration-2.0.xsd 
    http://www.springframework.org/schema/integration/jdbc 
    http://www.springframework.org/schema/integration/jdbc/spring-integration-jdbc-2.0.xsd 
    http://www.springframework.org/schema/jdbc 
    http://www.springframework.org/schema/jdbc/spring-jdbc-3.0.xsd
     http://www.springframework.org/schema/integration/jms 
    http://www.springframework.org/schema/integration/jms/spring-integration-jms-2.0.xsd">
    <context:component-scan base-package="com.wisely.inbound"/>
     
	<int-jms:channel id="target"  queue-name="jdbc.queue" connection-factory="connectionFactory"/>
	
	<int-jdbc:inbound-channel-adapter channel="target" 
									  data-source="dataSource"
									  query="select p.id as ppid,p.name as ppname from pp p,status s where p.id=s.ppid and s.status='new'"
									  update="update status as st set st.status='old' where ppid in (:ppid)"
									   >
		<!-- 每隔多少毫秒去抓取 -->
		<int:poller fixed-rate="5000" >
			<int:transactional/>
		</int:poller>
		<!--  指定时刻抓取
		<int:poller max-messages-per-poll="1">
			<int:transactional/>
			<int:cron-trigger expression="0 0 3 * * ?"/>
		</int:poller>
		-->
	</int-jdbc:inbound-channel-adapter>
	<!--  
	<int-jms:message-driven-channel-adapter id="queInbound" destination-name="jmsQueue" channel="target"/>
	-->
	<int:service-activator input-channel="target" ref="jdbcMessageHandler"/>
	 
	 <context:property-placeholder location="classpath*:META-INF/spring/*.properties"/>
	 <bean class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close" id="dataSource">
        <property name="driverClassName" value="${database.driverClassName}"/>
        <property name="url" value="${database.url}"/>
        <property name="username" value="${database.username}"/>
        <property name="password" value="${database.password}"/>
    </bean>
    
    <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
    	<property name="dataSource" ref="dataSource"/>
    </bean>
    
    
    <bean id="activeMqConnectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">
    	<property name="brokerURL" value="vm://localhost" />
    </bean>
    
    <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
    	<property name="sessionCacheSize" value="10" />
    	<property name="cacheProducers" value="false"/>
    	<property name="targetConnectionFactory" ref="activeMqConnectionFactory"/>
    </bean>
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    	<property name="connectionFactory" ref="connectionFactory"/>
    	<property name="defaultDestinationName" value="jmsQueue" />
    </bean>
</beans>

 

 

分享到:
评论

相关推荐

    spring integration master

    Spring Integration 是一个基于 Spring 框架的轻量级企业级集成库,它提供了一种声明式的方法来处理系统间的消息传递和数据流。这个"spring integration master"压缩包文件很可能是包含了一系列示例,帮助开发者更好...

    spring-integration2.03

    Spring Integration 是 Spring 框架的一个扩展,旨在提供轻量级、企业级的集成解决方案,它简化了应用程序之间的数据传输,支持多种协议和消息传递模式。在本文中,我们将深入探讨 Spring Integration 2.0.3 版本的...

    Spring Integration 中的新增功能

    Spring Integration 提供了多种适配器,使得与外部系统(如数据库、文件系统、邮件服务器等)的交互变得简单易行。 **Spring Integration 的新增功能** 1. **通道增强** 新版本可能增加了对不同类型通道的支持,...

    spring-integration

    2. **连接器(Adapters)**:连接器是Spring Integration与外部系统交互的桥梁,比如文件系统、数据库、HTTP服务器、JMS队列等。它们将Spring Integration的内部消息模型转换为特定系统的协议和格式。 3. **消息...

    Pro Spring Integration

    - **集成模式**:本书介绍了标准的集成模式,如消息传递、文件传输、数据库同步等,这些模式在构建集成系统时非常有用。 - **企业级挑战**:讨论了企业在进行集成时面临的常见挑战,例如异构系统之间的兼容性问题、...

    spring集成example

    Spring Integration是Spring框架的一个扩展,旨在提供一个简单、声明式的机制来处理应用之间的集成问题,如数据传输、消息处理等。它支持多种集成模式,如文件系统、数据库、HTTP、JMS、FTP、SMTP等。 下面我们将...

    spring-int-rest:这是使用Spring Integration for REST API的Spring引导应用程序

    通过提供一系列预定义的连接器,Spring Integration使得开发者能够轻松地实现不同系统间的交互,例如文件系统、消息队列、数据库以及HTTP服务等。 在"spring-int-rest"项目中,我们看到的是一个基于Spring Boot的...

    Spring Batch in Action英文pdf版

    Spring Batch提供了多种内置的ItemReader和ItemWriter实现,支持从数据库、文件等多种数据源读取数据,以及将数据写入到数据库、文件等目的地。 知识点五:数据处理 在数据被读取之后,通常需要经过一定的处理才能...

    spring-integration:您可以看到我们如何使用 spring 集成模块

    通道是 Spring Integration 中的数据传输途径,它们负责在不同的组件之间传递消息。标准输入通道适配器用于接收来自外部系统的数据,而标准输出通道适配器则负责将数据发送到目标系统。这种设计模式使得系统能够灵活...

    spring-5.3.9-dist.zip(spring-framework-5.3.9)

    Spring框架由多个模块组成,如Core Container(核心容器)、Data Access/Integration(数据访问/集成)、Web、AOP(面向切面编程)、Test等。每个模块都有相应的jar包,例如`spring-context.jar`提供了上下文支持,`...

    ygg-client-web-0.13.2.zip

    此外,它还可以用于系统之间的数据同步,如数据库同步、文件系统与Web服务之间的数据迁移等。另外,对于物联网(IoT)应用,Spring Integration的扩展组件可以处理传感器数据的接收和处理,实现设备与云端的无缝连接。...

    infinispan-cachestore-jdbc-8.2.3.Final.zip

    Infinispan通过JDBC Cache Store插件,支持多种数据库,如MySQL、Oracle等,用户可以自定义数据存储策略,比如批处理、同步异步模式等。 在配置Infinispan的JDBC Cache Store时,我们需要设置数据源、表结构以及...

    spring blob

    在处理Blob数据时,可以使用其提供的`BlobHandler`和`BlobStore`接口来实现文件系统、云存储服务(如Amazon S3或Google Cloud Storage)与数据库之间的数据同步。 6. **持久化策略**: 对于Blob数据的持久化,有直接...

    JAVA+SPRING全掌握

    9. **Spring Integration**: 集成外部系统,提供消息驱动的编程模型。 **学习路径** 1. 先掌握Java基础,理解面向对象编程思想。 2. 学习Spring核心模块,如DI和AOP,通过实际项目练习。 3. 探索Spring Boot和...

    官方原版完整包 spring-framework-5.3.0.RELEASE.zip

    1. **模块化设计**:Spring Framework 5.3.0 仍然保持其模块化的结构,包括核心容器(Core Container)、数据访问/集成(Data Access/Integration)、Web、AOP(面向切面编程)和测试模块等。这使得开发者可以根据...

    Spring Batch参考文档中文版

    通过这个示例可以了解如何配置Spring Batch来实现数据导入任务,包括配置`ItemReader`读取CSV文件中的数据,配置`ItemWriter`将数据写入MySQL数据库。 #### 七、Spring Batch 3.0新特性 1. **JSR-352支持**:增加...

Global site tag (gtag.js) - Google Analytics