`

ActiveMQ 结合Spring进行数据同步

 
阅读更多
注意事项hibernate配置文件必须设置自动提交否则不能插入成功

配置文件
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
		xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
		xmlns:aop="http://www.springframework.org/schema/aop"
		xmlns:tx="http://www.springframework.org/schema/tx"
		xmlns:context="http://www.springframework.org/schema/context"
		xsi:schemaLocation="
			http://www.springframework.org/schema/beans 
			http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
			http://www.springframework.org/schema/aop 
			http://www.springframework.org/schema/aop/spring-aop-2.5.xsd
			http://www.springframework.org/schema/tx 
			http://www.springframework.org/schema/tx/spring-tx-2.5.xsd
			http://www.springframework.org/schema/context 
			http://www.springframework.org/schema/context/spring-context-2.5.xsd"> 
		
    <!-- 配置activeMQ -->
    <bean id="jmsFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://localhost:61616" />
        <property name="userName" value="system" />  
        <property name="password" value="manager" />  
    </bean>

    <!-- queue目的地配置 -->  
    <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">  
        <constructor-arg index="0" value="spring-queue" />  
    </bean>  
    <!-- topic目的地配置,其实不管是topic还是queue则他们的底层实现不同但是通过封装api就差不多了,而在spring中更是简单 -->  
    <bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic">  
        <constructor-arg index="0" value="spring-topic" />  
    </bean>  
  
    <!-- spring 使用jmsTemplate来实现消息的发送和接受 -->  
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">  
        <property name="connectionFactory" ref="jmsFactory"></property>  
        <property name="defaultDestination" ref="destinationTopic"></property>
	<!--pubSubDomain设置true则是Topic模式 false则是queue模式 -->
        <property name="pubSubDomain" value="true"/>
    </bean>
    
    <!--异步监听 -->  
    <bean id="myMessageListener" class="xxx.activemq.MyMessageListener">
    	  <property name="commonDao">
			<ref bean="commonDao" />
		</property>
    </bean> 
    <!-- 主题监听容器 {Topic模式} -->
    <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer" lazy-init="false">
        <property name="connectionFactory" ref="jmsFactory" />  
        <property name="destination" ref="destinationTopic" />  
        <property name="messageListener" ref="myMessageListener" />
        <property name="receiveTimeout" value="1000000" />
    </bean>
    
</beans>



监听类

public class MyMessageListener implements MessageListener {
	
	private static final Log LOG = LogFactory.getLog(MyMessageListener.class);
	
	private CommonDao commonDao;
	
	public CommonDao getCommonDao() {
		return commonDao;
	}

	public void setCommonDao(CommonDao commonDao) {
		this.commonDao = commonDao;
	}

	@SuppressWarnings("all")
    public void onMessage(Message arg0) {
        try {
        	ObjectMessage message =(ObjectMessage) arg0;
        	System.out.println(message);
        	if(message != null){
            	HashMap msg = (HashMap)message.getObject();
            	this.handleMessage(msg.get("MSGBODY"), (String)msg.get("HANDLETYPE"));
            	LOG.info("ActiveMQ成功处理一条数据!类名---->"+msg.get("CLASSNAME"));
            }
        } catch (Exception e) {  
            e.printStackTrace();  
        }  
    }  
	
	private void handleMessage(Object obj,String type){
		if(obj == null || type == null){
			LOG.error("ActiveMQ处理的消息体不能为空或处理类型不能为空!");
			throw new NullPointerException("ActiveMQ处理的消息体不能为空!");
		}
		if(HANDLETYPE.ADD.toString().equals(type)){
			this.commonDao.save(obj);
		}
		if(HANDLETYPE.UPDATE.toString().equals(type)){
			this.commonDao.update(obj);
		}
		if(HANDLETYPE.DELETE.toString().equals(type)){
			this.commonDao.delete(obj);
		}
	}
}


enum HANDLETYPE{
	ADD,UPDATE,DELETE
}



消息发送类

@Service("activemqMessageService")
@SuppressWarnings("all")
public class ActivemqMessageServiceImpl implements ActivemqMessageService{

	private static final Log log = LogFactory.getLog(DataProviderServiceImpl.class);
	@Resource
    private JmsTemplate jmsTemplate;
	
	@Override
	public void sendMessage(Object msgObj,String handleType,String className){
		String destination =  jmsTemplate.getDefaultDestination().toString();
		final HashMap<String, Object> map = new HashMap<String, Object>();
        map.put("HANDLETYPE", handleType);  
        map.put("MSGBODY", msgObj);
        map.put("CLASSNAME", className);
        System.out.println("向队列" +destination+ "发送了消息------------" + map);
        jmsTemplate.send(new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                return session.createObjectMessage(map);
            }
        });
	}
	
}



分享到:
评论
发表评论

文章已被作者锁定,不允许评论。

相关推荐

    JMS+ActiveMQ+Spring 完整样例代码

    在这个"JMS+ActiveMQ+Spring 完整样例代码"中,我们将会探讨如何将这三者结合起来,实现一个简单的消息传递系统。以下是关键的知识点: 1. **JMS接口** JMS定义了两种主要的消息模型:点对点(Point-to-Point,P2P...

    C# ActiveMQ 和Spring.NET框架开发示例

    在开发中,开发者需要将ActiveMQ与C#应用程序结合,使用***框架来开发消息队列系统,这涉及到配置连接工厂、队列目的地、消息监听器容器等组件,以及编写消息消费者和生产者的代码逻辑。 在C#应用程序中使用***框架...

    ActiveMQ快速上手 PDF

    - **结合Spring的开发**:利用 Spring 的支持简化 ActiveMQ 的配置,并通过 Spring 管理连接和事务。 #### 五、ActiveMQ的Transport - **多种传输协议**:ActiveMQ 支持多种传输协议,如 TCP、SSL、NIO、UDP 等,每...

    spring+springmvc+mybatis+mongodb+ActiveMQ+CXF

    在分布式系统中,ActiveMQ常用于实现任务调度、事件通知和数据同步等功能,提高系统的响应速度和稳定性。 CXF是一个开源服务框架,它支持多种Web服务标准,如SOAP、RESTful等,可以方便地创建和消费Web服务。CXF...

    ActiveMQ与Zookeeper集群测试代码

    标题中的“ActiveMQ与Zookeeper集群测试代码”指的是一个实验或示例项目,旨在演示如何结合这两个组件来构建高可用的消息传递环境。Zookeeper在这里的角色可能是用来管理ActiveMQ集群的状态,实现节点间的选举和故障...

    springBoot2.0.1、zookeeper、dubbo、activemq、redis整合分布式架构

    总结来说,这个项目运用了 Java 技术栈,通过 SpringBoot 构建微服务,利用 Zookeeper 进行服务发现,借助 Dubbo 实现服务间通信,依赖 ActiveMQ 处理异步消息,以及使用 Redis 作为缓存和实现分布式锁,构建了一个...

    websocket+activemq.rar

    WebSocket 和 ActiveMQ...总结来说,WebSocket和ActiveMQ的结合使用,能够为基于SpringBoot的应用提供强大的实时通信能力,实现高效的数据同步和异步任务处理。这种技术方案在金融、电商、物联网等领域有着广泛的应用。

    spring-jms-4.3.4.RELEASE.zip

    3. **高可用性和容错**:消息中间件如ActiveMQ、RabbitMQ等,结合Spring JMS,可以构建高可用和故障切换的系统。 综上所述,Spring JMS 4.3.4.RELEASE为开发者提供了一套完整的JMS集成方案,通过抽象和封装JMS API...

    use sse in spring4.2

    在一个项目中,如果使用ActiveMQ与其他进程进行通信,当从消息队列接收到消息时,`onMessage()`方法会被触发。此时,我们可以使用Server-Sent Event将消息推送到浏览器。以下是如何实现这一目标的步骤。 首先,快速...

    MF00641-Spring Cloud分布式物联网(IOT)平台.zip

    在物联网(IoT)场景下,Spring Cloud结合以下技术可以实现高效的数据处理和设备管理: 1. **RabbitMQ/ActiveMQ**:消息队列,用于处理物联网设备产生的大量实时数据,实现异步通信和削峰填谷。 2. **Apache Kafka**...

    ActiveMQ项目实战

    总结起来,这个项目展示了如何利用ActiveMQ结合Solr进行实时数据同步,同时体现了面向服务的设计思想,通过Service层封装业务逻辑,Dao层处理数据操作,以及利用Spring进行容器管理。整个流程确保了商品信息的及时...

    SpingMVC+MongoDB+Redis 初步架构设计

    4. **数据一致性**:由于MongoDB和MySQL的数据模型和操作方式不同,需要关注数据的一致性问题,可能需要引入事件驱动或消息队列(如RabbitMQ)来确保数据同步。 5. **安全性和监控**:配置Spring Security进行权限...

    springboot整合mybatis+plus+avtiveMq+redis

    - 首先,引入对应的SpringBoot启动器依赖,如`spring-boot-starter-data-jpa`(MyBatis)、`spring-boot-starter-data-redis`(Redis)、`spring-boot-starter-activemq`(ActiveMQ)。 - 配置数据库连接、Redis...

    JavaEE企业级开发的面试题汇总

    JavaEE企业级开发面试题汇总...这些知识点构成了JavaEE企业级开发的基础,面试中通常会结合具体项目经验和实际问题进行深入探讨,以评估候选人的专业能力和实践经验。理解和掌握这些内容对于Java开发者来说至关重要。

    sockjs.min.js和stomp.min.js

    STOMP(Simple (or Streaming) Text Oriented Messaging Protocol)是简单文本导向消息协议,是一种轻量级的网络协议,常用于与消息中间件(如RabbitMQ、Apache ActiveMQ等)进行交互。在WebSocket场景下,STOMP可以...

    JMS完全实例(八个实例)

    **JMS完全实例详解** Java消息服务(Java Message Service,简称JMS)是Java平台中用于企业级应用间异步通信的一种标准接口。...同时,结合Spring框架的使用,能让你更好地在实际工作中利用JMS实现高效、可靠的通信。

    YY面试题合集.pdf

    1. **元数据同步**:当一个新的节点加入集群时,它会同步所有现有的元数据。如果某个节点离开或出现故障,集群会自动重新分配该节点上的元数据,确保服务的连续性。 2. **集群模式的优势**:RabbitMQ 集群能够实现...

    十个java项目

    可能使用Unity或Cocos2d-x引擎,结合Java进行后端服务器开发,使用TCP/IP协议处理游戏数据传输。 10. **图书馆管理系统**:用于图书借阅、归还、查询等操作。关键技术包括条形码识别、图书分类算法、使用Spring框架...

    基于Jboss的jms编程

    结合Spring的JMS支持,开发者可以构建健壮、可扩展的应用,确保数据在分布式环境中的安全传输。 总的来说,基于Jboss的JMS编程涉及到Spring的集成、JNDI配置、连接工厂的创建、消息模板的使用,以及对JMS基本概念的...

    java简历模版1.docx

    - 参与舒心购电商系统项目,利用分布式架构、Dubbo、RabbitMQ等实现服务调用和缓存同步,使用Nginx进行负载均衡。 **项目经验** - 舒心购项目中,负责接口文档编写、商品信息管理、商品检索、秒杀、购物车等功能的...

Global site tag (gtag.js) - Google Analytics