一、序言
JMS 相关的东西已经出来了很久,本想使用阿里的rocketMQ 发现很多没遵循JMS 规范,暂时就用用activeMq,做一些常用的系统解耦 协同工作,这里还是和spring 进行集成,spring 和JMS 配合还是挺好的。
二、场景
A系统产生了一笔订单,那么我们其他B C 系统会拿到订单的基本信息,然后进行金额的计算 以及 用户资料的分析 等等操作,以前的方法是 写个定时任务 扫描最新订单,但是实时性 就没那么高了,而且随着需求得越来越多,导致新进来一笔订单,就会有N个任务进行扫描 分析,扩展很死板,维护也麻烦。
三、实例代码
3.1 为了简单,到http://activemq.apache.org/download.html 下载activeMq 的东西,这里用的版本是5.10。
3.2 暂时先用activemq 自带的服务器,测试先用P2P 1对1 的方式 ,看看效果。
spring xml 代码:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd"> <!-- jms 连接工厂 --> <bean id="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory"> <!-- TCP异步传输 --> <property name="brokerURL" value="tcp://localhost:61616?jms.useAsyncSend=true" /> </bean> <!-- 基本的bean模板 --> <bean id = "jmsTemplate" class = "org.springframework.jms.core.JmsTemplate"> <!-- 链接工长 --> <property name="connectionFactory" ref="connectionFactory"/> <!-- 发送模式 DeliveryMode.NON_PERSISTENT=1:非持久 --> <!-- DeliveryMode.PERSISTENT=2:持久--> <property name="deliveryMode" value="1" /> </bean> <!-- 队列的目的地描述 --> <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <!-- 通过 构造 设定 队列的名字 --> <constructor-arg index="0" value="orderQueue"/> </bean> </beans>
消息发送者代码:
这里虚拟了一个订单类Order 表示一笔订单,和一个单独的线程Sender 进行消息发送,一共20条消息
import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import javax.jms.*; import java.io.Serializable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * Created by qiqiang on 2014/12/2. */ public class JmsPointSender { public static void main(String[] args) throws Exception { ApplicationContext context = new ClassPathXmlApplicationContext("spring/spring-jms.xml"); // 获得JMS 模板 JmsTemplate jmsTemplate = (JmsTemplate)context.getBean("jmsTemplate"); // 获得发送消息的目的地 Destination destination = (Destination)context.getBean("queueDestination"); // 这里模拟 单独另外的消息发送的线程 JmsSender sender = new JmsSender(jmsTemplate,destination); ExecutorService service = Executors.newFixedThreadPool(1); service.execute(sender); } } // 发送消息 的类 class JmsSender implements Runnable{ JmsTemplate jmsTemplate; Destination destination; JmsSender(JmsTemplate jmsTemplate, Destination destination) { this.jmsTemplate = jmsTemplate; this.destination = destination; } // 定义个消息发送的条数 static int i = 0; @Override public void run() { // 这里我们一秒发送一条消息的模式 for(;i<20;i++){ // 发送消息 jmsTemplate.send(destination, new MessageCreator() { // 这里的session 会有工厂自动创建 public Message createMessage(Session session) throws JMSException { Order order = new Order(i, "name"+i); // 消息分为很多种,有字符串 字节 对象等,这里我们使用对象 System.out.println("发送条数--------------------------"+i); return session.createObjectMessage(order); } }); } } } // 订单的属性 class Order implements Serializable { private int id; private String name; Order(int id, String name) { this.id = id; this.name = name; } public Order(){} public int getId() { return id; } public void setId(int id) { this.id = id; } @Override public String toString() { return "Order{" + "id=" + id + ", name='" + name + '\'' + '}'; } }
消息接收者的 类
import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.jms.core.JmsTemplate; import javax.jms.*; /** * Created by qiqiang on 2014/12/9. */ public class JmsReceiver { public static void main(String[] args) throws Exception { ApplicationContext context = new ClassPathXmlApplicationContext("spring/spring-jms.xml"); // 获得发送消息的目的地 Destination destination = (Destination)context.getBean("queueDestination"); // 获得JMS 模板 JmsTemplate jmsTemplate = (JmsTemplate)context.getBean("jmsTemplate"); int i = 0; ObjectMessage message = null; // 这里写个循环,一直接受消息到结束 while ( (message = (ObjectMessage)jmsTemplate.receive(destination)) != null){ System.out.println("收到消息条数:"+i++ +" "+message.getObject());; } } }
四、执行过程:
1.解压我们下下来的activemq,然后在apache-activemq-5.1.0\bin 目录下,windows 系统 直接运行activemq.bat,linux 应该有个activemq.sh 的启动项,总之我先运行这个broker,这个东西我们可以认为是消息的一个存放中心,一个中转站.运行成功了 可以从http://localhost:8161/admin/queues.jsp 地址提供的一个页面,里面有消息的信息。
2.执行JmsPointSender ,没出错的话,应该会看到消息发送了20条,并且在queues.jsp 里面能看到如下信息:orderQueue 是我们的Destination,20 就是我们发送的消息数。
3.启动JmsReceiver,就会看到消费了20条消息,上面的变化就看到0,1,20,20
表示消费1个,发送20条,消费20条,还剩0条,消费过程就结束了
五、Topic 订阅者消费模式
刚才我们是1对1 的消费模式,肯定不是不满足我们得需求的, 我们需要一笔订单需要 N 个消费者去处理,因此改变如下:
<!-- 消息订阅模式,在spring xml 里面加,这是发送的 Destination --> <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic"> <!-- 订阅消息的名字 --> <constructor-arg index="0" value="orderTopic"/> </bean>
由于这种属于发布订阅模式,因此消费者得一直监听,才能收到消息,我们先做一个消费者得监听器:
public class ConsumerMessageListener implements MessageListener { @Override public void onMessage(Message message) { // 很简单,我们直接打印 System.out.println("topic 收到消息:"+message); } }
然后spring 里面同样要配置监听的情况,为了模拟发布者 和 消费在不同的服务器,我们做写两个xml 文件:spring-jms-consumer.xml,内容除了监听,连接服务器的东西都一样
<!-- jms 连接工厂 --> <bean id="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://localhost:61616?jms.useAsyncSend=false" /> </bean> <bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory --> <property name="targetConnectionFactory" ref="connectionFactory"/> </bean> <!-- 消息订阅模式 --> <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic"> <!-- 订阅消息的名字 --> <constructor-arg index="0" value="orderTopic"/> </bean> <!-- 消息监听,这里可以认为是A服务器的监听,这里特殊 --> <bean id="messageListener" class="com.xx.ConsumerMessageListener"/> <bean id="listenerContainer" class="org.springframework.jms.listener.SimpleMessageListenerContainer"> <!-- 工厂 目的地 监听器 这里如果用原始activemq 写,这些属性也是必要的 --> <property name="connectionFactory" ref="connectionFactory" /> <property name="destination" ref="topicDestination" /> <property name="messageListener" ref="messageListener" /> </bean>
这里再看 发送者的代码
import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import javax.jms.*; import java.io.Serializable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * Created by qiqiang on 2014/12/9. */ public class JmsTopicSender { static int i = 1; public static void main(String[] args) throws Exception { ApplicationContext context = new ClassPathXmlApplicationContext("spring/spring-jms.xml"); // 获得JMS 模板,这里都差不多一样 JmsTemplate jmsTemplate = (JmsTemplate)context.getBean("jmsTemplate"); // 获得发送消息的目的地 Destination destination = (Destination)context.getBean("topicDestination"); // 发送消息 jmsTemplate.send( destination,new MessageCreator() { public Message createMessage(Session session) throws JMSException { Order order = new Order(i, "name"+i+":"+session); System.out.println("发送条数--------------------------"+i); return session.createObjectMessage(order); } }); } }
下面模拟是一个消费者的代码
import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.jms.core.JmsTemplate; import javax.jms.Destination; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.ObjectMessage; /** * Created by qiqiang on 2014/12/10. */ public class JmsTopicReceiver{ public static void main(String[] args) throws Exception { // 加载消费者监听 ApplicationContext context = new ClassPathXmlApplicationContext("spring/spring-jms-consumer.xml"); // 写个死循环,模拟服务器一直运行 while (true){} } }
最后先启动 消费者JmsTopicReceiver,然后再启动发送,这样模拟先监听 后发送的情况,如果先发送,而没监听 是收不到消息的。
小结:
1.上面只是最基本的测试,有问题再说...
2.关于queue 可以尝试下 发送10W跳数据看看会出现什么问题,也可以尝试下发送一半突然中断会出现什么问题?
3.关于topic 模式,假设我发送者发送消息的时候,有一台消费者恰好重启或者挂掉怎么办?仅仅持久化吗?
这些后面再解决!
相关推荐
通过这个示例,你可以了解到如何在Spring Boot应用中集成ActiveMQ,使用Queue和Topic进行消息传递,并了解连接池和消息确认机制。这些技术在分布式系统、微服务架构中广泛用于实现异步通信和解耦。
同时,消息中间件ActiveMQ作为Apache出品的一款开源JMS(Java Message Service)实现,被广泛用于处理异步通信和解耦系统组件。本示例将详细讲解如何在Windows环境下,利用Maven构建工具,将Spring与ActiveMQ进行...
本教程将详细介绍如何在Spring Boot项目中集成ActiveMQ,实现消息接收的Demo。 首先,我们需要在Spring Boot项目中引入ActiveMQ的相关依赖。在`pom.xml`文件中添加以下Maven依赖: ```xml <groupId>org.spring...
1. **配置文件**:Spring的XML配置文件(如`applicationContext.xml`)会包含ActiveMQ的相关配置,如`ConnectionFactory`、目的地(Queue或Topic)以及消息监听器的设置。 2. **生产者代码**:这部分代码负责创建并...
在IT行业中,Spring框架是...1. **JMS配置**: 在Spring的配置文件(如applicationContext.xml)中,添加对JMS的支持,包括定义ConnectionFactory、Destination(Topic或Queue)和MessageListenerContainer。 ```xml ...
运行ActiveMQ_Demo中的示例项目,你可以看到Spring应用成功地与嵌入式的ActiveMQ服务器交互,实现了消息的发送和接收。这个完整的demo可以帮助开发者快速理解并实践Spring与ActiveMQ的集成,为构建高效、可靠的...
这个“springboot2整合activemq的demo”提供了一个实际的例子,帮助开发者理解如何在Spring Boot应用中使用ActiveMQ实现消息队列的功能。以下是关于这个主题的详细知识点: 1. **Spring Boot 2**: - Spring Boot ...
在本示例中,我们将深入探讨如何将Spring框架与ActiveMQ集成,以便实现消息队列(Queue)和主题(Topic)的功能,并确保消息的持久化。ActiveMQ是Apache软件基金会开发的一个开源消息中间件,它支持多种消息协议,如...
总结,通过SpringBoot整合ActiveMQ,我们可以轻松地在应用程序中实现消息的发布和订阅,利用点对点和主题订阅模式优化系统设计。同时,利用ActiveMQ的管理界面,我们可以直观地监控和管理消息队列,为系统的维护和...
ActiveMQ整合Spring的Demo是一个典型的Java企业级应用示例,它展示了如何在Spring框架中集成Apache ActiveMQ,以便实现消息队列的功能。ActiveMQ是Apache软件基金会的一个开源项目,它是一个功能丰富的Java消息服务...
通过配置`ConnectionFactory`和`Destination`(Queue或Topic),可以在`Spring`应用中发送和接收消息。 3. **Spring配置**:在`Spring`的XML配置文件中,我们需要定义`<bean>`来创建`ActiveMQConnectionFactory`,...
本初学使用DEMO将带你走进ActiveMQ的世界,通过队列(Queue)和主题(Topic)两种消息模型来了解其基本用法。 1. **ActiveMQ简介**: - ActiveMQ 是Apache软件基金会的一个项目,它提供了一个跨语言、跨平台的消息...
本DEMO将展示如何通过Spring整合ActiveMQ来实现队列(Queue)和主题(Topic)两种不同的通信方式。队列遵循“先进先出”原则,每个消息只有一个消费者;而主题支持多播,允许多个消费者同时接收消息。 首先,你需要...
这个"spring-jms-demo"项目显然是一个示例应用,它展示了如何在Spring环境中使用JMS来实现消息传递。让我们深入探讨Spring JMS的核心概念、功能以及如何在实际开发中应用。 首先,JMS是Java平台的一个标准接口,...
这个简单的Demo展示了如何使用Maven、Spring和ActiveMQ构建一个Topic模式的消息传递系统。实际应用中,你可以根据需求进行扩展,比如添加多个消费者,或者通过配置ActiveMQ服务器以实现高可用性、消息持久化等功能。...
ActiveMQ是Apache软件基金会开发的一款开源消息中间件,它遵循Java消息服务...了解这些知识点后,你可以构建一个健壮的消息驱动系统,利用ActiveMQ和Spring JMS实现组件间的异步通信,提高系统的稳定性和可扩展性。
本示例"ActiveMQ整合Spring的一个demo"旨在展示如何将这两个强大的工具结合在一起,以实现高效、可靠的消息传递。 Apache ActiveMQ是开源的JMS(Java Message Service)提供商,它允许应用程序之间通过异步通信进行...
1. **配置 Spring**:在 Spring 配置文件中定义 `ConnectionFactory` 和目的地 Bean,使用 `<jms:queue>` 或 `<jms:topic>` 标签。 2. **创建消息监听器**:使用 Spring 的 `<jms:listener-container>` 和 `...