通过Spring对ActiveMQ进行配置开发,发布订阅模式,支持消息的持久化。
需要Spring2.5版本以上,如果有多个订阅者,每个订阅者需要指定不同的 clientId 。
发布者的配置:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd"> <!-- 配置JMS连接工厂 --> <bean id="myConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <!-- Session缓存数量 --> <property name="sessionCacheSize" value="10" /> <property name="targetConnectionFactory"> <bean class="org.apache.activemq.ActiveMQConnectionFactory"> <!-- MQ地址 --> <property name="brokerURL" value="tcp://localhost:61616" /> <!-- 是否异步发送 --> <property name="useAsyncSend" value="true" /> </bean> </property> </bean> <!-- 发送消息的目的地(一个主题) --> <bean id="myDestination" class="org.apache.activemq.command.ActiveMQTopic"> <!-- 设置消息主题的名字 --> <constructor-arg index="0" value="Online.Notice.Topic" /> </bean> <!-- 配置JMS模版 --> <bean id="myJmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="myConnectionFactory" /> <property name="defaultDestination" ref="myDestination" /> <!-- 订阅发布模式 --> <property name="pubSubDomain" value="true" /> <property name="receiveTimeout" value="10000" /> </bean> </beans>
发布者的代码:
package com.xikang.jms; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import javax.jms.TextMessage; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; public class SimpleJMSSender { public static void main(String[] args) { ApplicationContext ctx = new ClassPathXmlApplicationContext("applicationContext-send.xml"); JmsTemplate jmsTemplate = (JmsTemplate) ctx.getBean("myJmsTemplate"); for (int i = 0; i < 10; i++) { jmsTemplate.send(new MessageCreator() { public Message createMessage(Session session) throws JMSException { TextMessage msg = session.createTextMessage(); // 设置消息属性 msg.setStringProperty("phrCode", "C001"); // 设置消息内容 msg.setText("Hello World!"); return msg; } }); } } }
订阅者的配置:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd"> <!-- 配置JMS连接工厂 --> <bean id="myConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <!-- Session缓存数量 --> <property name="sessionCacheSize" value="10" /> <!-- 接收者ID --> <property name="clientId" value="client_119" /> <property name="targetConnectionFactory"> <bean class="org.apache.activemq.ActiveMQConnectionFactory"> <!-- MQ地址 --> <property name="brokerURL" value="tcp://localhost:61616" /> </bean> </property> </bean> <!-- 发送消息的目的地(一个主题) --> <bean id="myDestination" class="org.apache.activemq.command.ActiveMQTopic"> <!-- 设置消息主题的名字 --> <constructor-arg index="0" value="Online.Notice.Topic" /> </bean> <!-- 生产消息配置 (自己定义)--> <bean id="myTopicConsumer" class="com.xikang.jms.SimpleJMSReceiver" /> <!-- 消息监听器 --> <bean id="myTopicListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter"> <constructor-arg ref="myTopicConsumer" /> <!-- 接收消息的方法名称 --> <property name="defaultListenerMethod" value="receive" /> <!-- 不进行消息转换 --> <property name="messageConverter"><null/></property> </bean> <!-- 消息监听容器 --> <bean id="myListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="myConnectionFactory" /> <!-- 发布订阅模式 --> <property name="pubSubDomain" value="true"/> <!-- 消息持久化 --> <property name="subscriptionDurable" value="true"/> <property name="receiveTimeout" value="10000"/> <!-- 接收者ID --> <property name="clientId" value="client_119" /> <property name="durableSubscriptionName" value="client_119"/> <property name="destination" ref="myDestination" /> <property name="messageListener" ref="myTopicListener" /> </bean> </beans>
订阅者的代码:
package com.xikang.jms; import javax.jms.JMSException; import javax.jms.TextMessage; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.jms.JmsException; public class SimpleJMSReceiver { public static void main(String[] args) { ApplicationContext ctx = new ClassPathXmlApplicationContext("applicationContext-receive.xml"); while(true) { } } public void receive(TextMessage message) throws JmsException, JMSException { System.out.println(message.getStringProperty("phrCode")); System.out.println(message.getText()); } }
相关推荐
1. **配置ActiveMQ**:在开始之前,我们需要在本地或者远程部署一个ActiveMQ服务器。配置文件通常为`activemq.xml`,在这里可以设置broker(消息代理)的属性,如端口、存储策略等。在Spring应用中,我们可以通过...
6. **实现消息的发布订阅模式**:在ActiveMQ中,有两种消息传递模式:点对点(Queue)和发布订阅(Topic)。点对点模式下,每个消息只有一个消费者;发布订阅模式下,一个消息可以被多个消费者(订阅者)接收。在...
发布/订阅模式下,我们使用主题而不是队列。在配置文件中更改`destination`类型为`ActiveMQTopic`: ```xml <bean id="destination" class="org.apache.activemq.command.ActiveMQTopic" factory-method="create...
在发布/订阅模式中,消息发送者称为发布者(publisher),消息接收者称为订阅者(subscriber)。发布者将消息发送到主题(topic),而订阅者监听这个主题,以接收消息。在这种模式下,发布者和订阅者之间的耦合度...
标题中的“activemq-3:使用spring配置activemq的发布订阅模式的示例”表明我们将探讨如何在Apache ActiveMQ中使用Spring框架配置发布/订阅(Pub/Sub)模式。ActiveMQ是流行的开源消息代理,它实现了Java消息服务...
* `spring.jms.pub-sub-domain=true`:指定是否使用发布订阅模式。 * `spring.activemq.user=wienerspring.activemq.password=wiener`:指定用户名和密码。 * `spring.activemq.pool.enabled=true`:指定是否使用...
- **配置ActiveMQ**:在Spring应用中,我们需要添加ActiveMQ的相关依赖,并在配置文件中定义ConnectionFactory和Destination。ConnectionFactory是连接到消息代理的工厂,Destination则是消息的目标,可以是Queue或...
3. **ActiveMQ配置**:在Spring中配置ActiveMQ,我们需要在`applicationContext.xml`或对应的配置文件中添加ActiveMQ的连接工厂和目的地(Topic或Queue)。这通常包括URL、用户名、密码等信息,以连接到ActiveMQ...
接下来,**Spring与ActiveMQ的集成**:在Spring应用中配置ActiveMQ,通常需要在Spring配置文件中定义`JmsTemplate`和`DefaultMessageListenerContainer`。`JmsTemplate`用于发送消息,而`...
这两个目录下的代码分别展示了如何创建和使用发布/订阅模式。 为了使系统更加健壮,我们还可以配置ActiveMQ的连接工厂,设置重试策略,以及处理消息确认和事务等高级特性。此外,通过Spring Boot的Actuator模块,...
主题则支持发布/订阅模式,多个订阅者可以接收相同的消息。 3. **JmsTemplate**: Spring提供的一个便捷工具类,用于发送和接收消息。通过它可以简化JMS操作,例如发送文本、对象或文件消息。 4. **MessageListener...
而在发布/订阅模式中,消息被发布到主题,多个订阅者可以接收到消息。 接下来,我们看看如何在Spring中配置ActiveMQ。Spring框架提供了一套完整的JMS支持,包括连接工厂、目的地(队列或主题)以及消息监听器的声明...
它支持多种协议,如TCP/IP、HTTP、HTTPS等,并且可以处理多种消息类型,如点对点(Queue)、发布/订阅(Topic)模式。 3. **Spring与ActiveMQ整合**:整合Spring和ActiveMQ的主要目的是利用消息队列实现服务间的...
在Spring框架中,我们可以通过使用Spring的JMS模块来配置ActiveMQ。这个过程涉及到以下几个关键点: 1. **JMS配置**:在Spring的配置文件中,我们需要定义一个`ConnectionFactory`,它是与消息代理(如ActiveMQ)...
3. **配置Spring**:在Spring配置文件中定义JMS连接工厂、目的地(Topic或Queue)、消息生产者和消费者。 4. **编写生产者代码**:创建一个类,使用Spring的JmsTemplate发送消息到目的地。 5. **编写消费者代码**:...
适用于广播或发布/订阅模式。 5. **事务与持久化** Spring JMS支持事务管理,可以确保消息在事务提交后才真正发送,或者在事务回滚时撤销发送。此外,ActiveMQ提供持久化存储,即使服务器重启,未消费的消息也不会...
Spring框架作为Java企业级应用的事实标准,提供了与ActiveMQ集成的便利,使得开发者可以轻松地在Spring应用中实现消息队列和发布/订阅模式的通信。 本DEMO将展示如何通过Spring整合ActiveMQ来实现队列(Queue)和...
ActiveMQ 5是其一个版本,提供了多种协议支持,如TCP、SSL、NIO等,以及各种消息模式,如点对点(Point-to-Point)、发布/订阅(Publish/Subscribe)等。它能够确保消息的可靠传输,并允许在分布式环境中进行高效的...
Spring Boot默认使用自动确认模式,即消费者在接收到消息后自动确认。如果需要手动确认,可以设置`acknowledge-mode`为`MANUAL`: ```properties spring.jms aktivemq.listener.acknowledge=manual ``` 在手动确认...