一、序言
demo2 留下了两个问题:
1.我们利用demo2 的配置,在queue 模式下 连续发送10W消息出现出现状况。
2.topic 模式下,消费者重启时间段收不到监听的信息怎么办?
二、问题解析:
1.测试 发送10W消息,中途会出现
socket: tcp://localhost:61616 due to: java.net.BindException: Address already in use: JVM_Bind 异常。
你关掉activemq,利用netstat -aon | findstr "61616" 发现没有这个端口占用情况,查阅资料才知道:
http://activemq.apache.org/jmstemplate-gotchas.html
里面解释道,发送消息的时候创建connection,session 还要关闭,比较费资源,我猜测当创建销毁操作没测试完成的时候,另一个消息发送的时候,发现端口被占着,就会出现这个种情况,也就是说当发送频率比较高的情况,容易出现,文档建议用 pool 的东西。
一共有spring 的CachingConnectionFactory 和 activemq 的PooledConnectionFactory,由于PooledConnectionFactory 这东西要 activemq-pool.jar ,因此我还是选择CachingConnectionFactory~。~。
配置spring-jms.xml更改如下:
<!-- jms 连接工厂 --> <bean id="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://localhost:61616?jms.useAsyncSend=false" /> </bean> <bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory --> <property name="targetConnectionFactory" ref="connectionFactory"/> <!-- Session缓存数量,这里属性也可以直接在这里配置 --> <property name="sessionCacheSize" value="100" /> <!-- 基本的bean模板 --> <bean id = "jmsTemplate" class = "org.springframework.jms.core.JmsTemplate"> <!-- 链接工厂,这里应用缓存池 就行了--> <property name="connectionFactory" ref="cachingConnectionFactory"/> </bean> </bean>
这样操作了时候,发送20W条信息也没出现过问题,而且速度比发送快了几十倍......
2.topic 模式下如果A 消费者挂了,就收不到消息了,而我又想它收到消息,我们先来尝试持久化吧!
1.对于持久化,并不默认,其实queue 默认就持久化在文件里面的,但是topic 模式下我们得开启持久化配置, 在activemq.xml 里面有这样的配置:
<!-- 这里设置true 就算开启了 --> <broker xmlns="http://activemq.apache.org/schema/core" persistent="true" brokerName="localhost" dataDirectory="${activemq.base}/data"> <!-- 这里是文件存放的位置,其他的说明暂时不讲 --> <persistenceAdapter> <amqPersistenceAdapter syncOnWrite="true" directory="${activemq.base}/data2" maxFileLength="3mb"/> </persistenceAdapter>
2.持久化我们在spring-jms.xml 里面还得开启几个东西:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd"> <!-- jms 连接工厂 --> <bean id="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://localhost:61616?jms.useAsyncSend=true" /> </bean> <bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory --> <property name="targetConnectionFactory" ref="connectionFactory"/> <!-- Session缓存数量 --> <property name="sessionCacheSize" value="100" /> </bean> <!-- 基本的bean模板 --> <bean id = "jmsTemplate" class = "org.springframework.jms.core.JmsTemplate"> <!-- 链接工长 --> <property name="connectionFactory" ref="cachingConnectionFactory"/> <!-- 进行持久化 --> <property name="deliveryMode" value="2" /> <!--订阅 发布模式 --> <property name="pubSubDomain" value="true" /> </bean> <!-- 消息订阅模式 --> <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic"> <!-- 订阅消息的名字 --> <constructor-arg index="0" value="orderTopic"/> </bean> </beans>
3.想想我们订阅者需要做些什么呢?发布者发布消息,订阅者去消费,这是1对多的形式,我们可以这样理解:公司设定很多活动代金卷,去参加活动的人都能领取,当然这分两种情况,第一种 就是我们前面测试的,只要我公司门口等(监听),活动开始(发布)就能领取了,如果你当时没在,就领取不到。第二种:很多情况下,公司搞活动我们不会等在那里,只要活动开始了,那么我过段时间也可以去,礼品公司会保留的,这种情况会导致多次领取,因此总要登记一下嘛,不能你领取了,过一会又来吧?activemq 里面会有clientId 标示来区分,类似于身份证ID嘛。
当然有些情况下, 我们一个ID 可以领取多个不同的奖品,因此还得需要个字段标示:durableSubscriptionName,标示我们领取哪个礼品,下面先看配置
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd"> <!-- jms 连接工厂 --> <bean id="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://localhost:61616?jms.useAsyncSend=true" /> </bean> <bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory --> <property name="targetConnectionFactory" ref="connectionFactory"/> <!-- 接收者ID --> <property name="clientId" value="clientA" /> </bean> <!-- 消息订阅模式 --> <bean id="topicCustomerA" class="org.apache.activemq.command.ActiveMQTopic"> <!-- 订阅消息的名字 --> <constructor-arg index="0" value="orderTopic"/> </bean> <!-- 消息监听,这里可以认为是A服务器的监听 --> <bean id="messageListener" class="com.raycloud.excalibur.mq.ConsumerMessageListener"/> <bean id="listenerContainerA" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="destination" ref="topicCustomerA" /> <property name="messageListener" ref="messageListener" /> <!-- 持久化消息 --> <property name="subscriptionDurable" value="true"/> <!-- 接收者ID --> <property name="clientId" value="clientA" /> <!-- 这里名字可以任意改变,A 领取了,你可以改成B 还可以领取,可以举例不是很恰当 --> <property name="durableSubscriptionName" value="clientA"/> </bean> </beans>
// 这是消费者代码,这里你可以创建 多个XMl文件,模拟多个消费者。 public class JmsTopicReceiver{ public static void main(String[] args) throws Exception { // 加载消费者监听 ApplicationContext context = new ClassPathXmlApplicationContext("spring/spring-jms-consumerA.xml"); // 写个死循环,模拟服务器一直运行 while (true){} } }
// 监听代码 直接输出 public class ConsumerMessageListener implements MessageListener { @Override public void onMessage(Message message) { System.out.println("topic 收到消息:"+message); } }
三、测试:
我们要完成一个收到一个order,然后N个服务计算的问题,因此采用topic 模式,而防止中途服务器挂掉,采用持久化方式,模拟测试如下:
1.启动两个ConsumerA,ConsumerB 监听,发布一个order ,同时收到消息,OK
2.启动一个ConsumerA,发布一个Order,再启动ConsumerB,也收到消息,OK
3.启动一个ConsumerA,发布一个Order,A收到,关闭mq服务器,重启mq服务(前后),重启ConsumerB ,同样收到消息,OK。
4.启动两个ConsumerA,ConsumerB,发布order,A,B收到消息,重启A,B 不收重复消息,OK
好像基本能满足需求了,由于发送量 不会很大,频率不会很高,可以试用一下了。
那么新问题是:
1.如果A,B 收到消息后,topic 的消息怎么处理呢? 一直保存着吗? 如果可以清除,怎么清除,什么时候进行清除呢?文件的方式方便管理吗?
2.虽然消息发送过去了,对象信息怎么接受呢,当然会有消息转换器..
3.在queue 模式下,服务器断开了,怎么从新连接呢,如果服务器挂了,怎么切换到备用的呢?
小结:
1.这是初步尝试了下 topic 持久化到文件,当然也可以持久化到数据的,关于activemq 持久化以及介绍文章,可以参考:http://blog.csdn.net/xyw_blog/article/details/9128219 比较详细。
2.
相关推荐
此外,ActiveMQ支持多种协议和特性,如topic、持久化、事务消息等,可以根据项目需求进一步探索和利用。 这个简单的Demo展示了如何在Spring Boot中集成ActiveMQ进行消息接收。通过这种方式,你可以构建出一个可靠的...
在实际应用中,可能还需要考虑更多因素,例如消息的持久化、事务性消息、消息确认机制等。不过,这个基础配置已经足够启动你的学习之旅,逐步探索更高级的特性。通过不断地学习和实践,你将能够熟练掌握Spring与...
在本示例中,我们将深入探讨如何将Spring框架与ActiveMQ集成,以便实现消息队列(Queue)和主题(Topic)的功能,并确保消息的持久化。ActiveMQ是Apache软件基金会开发的一个开源消息中间件,它支持多种消息协议,如...
- 部署时,可能需要考虑ActiveMQ集群、持久化存储、安全性等因素。 通过深入理解和实践这个“springboot2-activemq”示例,开发者能够熟练掌握在Spring Boot应用中集成ActiveMQ的方法,从而在实际项目中充分利用...
7. **持久化与消息确认**:ActiveMQ支持消息持久化,即使服务器重启,未消费的消息也不会丢失。同时,你可以配置消息确认模式,例如自动确认、客户端确认或DUPS_OK确认,以平衡消息处理速度和可靠性。 8. **监控与...
实际应用中,你可以根据需求进行扩展,比如添加多个消费者,或者通过配置ActiveMQ服务器以实现高可用性、消息持久化等功能。此外,还可以通过调整JMS模板的配置,实现事务性消息处理或者设置消息优先级等高级特性。...
此外,ActiveMQ还提供了许多高级特性,如消息持久化、事务支持、消息优先级、消息组和死信队列等。这些特性使得ActiveMQ在高可用性和容错性方面表现出色,能适应各种复杂的业务场景。 综上所述,这个"activeMQ demo...
6. **消息持久化**:了解ActiveMQ如何实现消息的持久化,确保在服务重启后仍能恢复未处理的消息。 7. **事务管理**:学习如何在ActiveMQ中使用JMS事务,确保消息的可靠传递。 8. **消息筛选与路由**:理解ActiveMQ...
在实际项目中,可能还需要考虑更多高级特性,如事务支持、消息确认策略、消息持久化等,以满足不同业务场景的需求。同时,为了提升性能和高可用性,还可以考虑使用ActiveMQ的集群配置。 在`active-demo-master`这个...
2. **持久化**:ActiveMQ提供了多种持久化机制,包括文件系统、数据库等,确保消息在服务器重启后仍可恢复。 3. **协议支持**:除了JMS,ActiveMQ还支持STOMP、AMQP、MQTT等多种协议,以适应不同场景的需求。 **二...
在IT行业中,消息队列(Message Queue,MQ)是一种常用于解耦系统、实现异步处理和提升系统可扩展性的技术。...此外,还可以利用ActiveMQ的其他特性,如主题(Topic)、持久化、事务支持等,以满足更复杂的业务场景。
- ActiveMQ是Apache软件基金会下的一个开源消息中间件,支持多种协议如OpenWire、STOMP、AMQP、MQTT等,提供高可用性和持久化保证。 - 特点:跨语言支持、高性能、安全、可靠。 3. **Spring与ActiveMQ整合**: -...
在IT行业中,消息队列(Message Queue,简称MQ)是一种重要的...在实际开发中,你可以根据项目需求,利用ActiveMQ实现消息的持久化、消息确认、消息分发策略(如负载均衡)等功能,进一步提升系统的稳定性和性能。
SpringJMS和ActiveMQ结合,可以实现消息的持久化,即使消息服务器重启,也不会丢失未被消费的消息。 8. **事务管理** SpringJMS提供了事务支持,可以在发送或接收消息时启用JMS事务,确保消息传递的可靠性。 9. ...
2. **共享存储(Shared Store)**:使用共享的持久化存储,确保所有节点都能访问相同的消息。 3. **虚拟主题(Virtual Topics)**:提供广播和订阅模型的结合,允许多个消费者同时接收消息。 五、Spring集群示例 `...
3. **与 Spring 集成**:ActiveMQ 可轻松嵌入到使用 Spring 框架的系统中,并且支持 Spring 2.0 的特性,简化了在 Spring 应用中的部署和配置。 4. **高效的消息持久化**:ActiveMQ 支持通过 JDBC 和 journal 实现...
**ActiveMQ入门详解** ActiveMQ是Apache组织开发的一款开源的消息中间件,它是Java ...随着你对ActiveMQ的深入学习,你将能更好地理解和利用它的特性,如持久化、事务、优先级和时间戳等,以优化你的分布式系统。
Kafka以其持久化、可扩展性和实时性著称。集成Kafka需要添加Kafka的相关依赖,并配置Bootstrap Servers等参数。使用`@KafkaListener`可以创建消息消费者,而`KafkaTemplate`则用于消息的发送。Kafka还支持Partition...