之前虽然谢过类似的代码,但是脑子里还有那么一点不清楚,主要消费者生产者的感念,今天工作不忙,再次复习,那个点还是没过去,请教同事一点通了,呵呵!以前的理解还是有误差的,不过发表的文章没问题!
消费者:
activemq-consumer.xml配置如下:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.apache.org/schema/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core.xsd">
<bean id="queueJmsConnectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616?wireFormat.maxInactivityDuration=0" />
<property name="useAsyncSend" value="true" />
</bean>
<!-- ActiveMQ destinations -->
<!-- 使用Queue方式-->
<amq:queue name="QUEUE0" physicalName="JMS-TEST-QUEUE0" />
<amq:queue name="QUEUE1" physicalName="JMS-TEST-QUEUE1" />
<amq:queue name="QUEUE2" physicalName="JMS-TEST-QUEUE2" />
<!-- converter -->
<bean id="defaultMessageConverter" class="com.activemq.common.DefaultMessageConverter" />
<!-- Message Driven POJO (MDP) -->
<!-- consumer for queue -->
<bean id="queueConsumer0" class="com.activemq.consumer.QueueConsumer" scope="prototype"/>
<bean id="queueConsumer1" class="com.activemq.consumer.QueueConsumer" scope="prototype"/>
<bean id="queueConsumer2" class="com.activemq.consumer.QueueConsumer2" scope="prototype"/>
<!-- Message Listener for -->
<bean id="queueListener0"
class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg ref="queueConsumer0" />
<!-- may be other method -->
<property name="defaultListenerMethod" value="receive" />
<!-- custom MessageConverter define -->
<property name="messageConverter" ref="defaultMessageConverter" />
</bean>
<bean id="queueListener1"
class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg ref="queueConsumer1" />
<!-- may be other method -->
<property name="defaultListenerMethod" value="receive" />
<!-- custom MessageConverter define -->
<property name="messageConverter" ref="defaultMessageConverter" />
</bean>
<bean id="queueListener2"
class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg ref="queueConsumer2" />
<!-- may be other method -->
<property name="defaultListenerMethod" value="receive" />
<!-- custom MessageConverter define -->
<property name="messageConverter" ref="defaultMessageConverter" />
</bean>
<!-- listener container,MDP无需实现接口 -->
<bean id="queueListenerContainer0"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="queueJmsConnectionFactory" />
<property name="destination" ref="QUEUE0" />
<property name="messageListener" ref="queueListener0" />
<property name="sessionAcknowledgeModeName" value="AUTO_ACKNOWLEDGE" />
<property name="receiveTimeout" value="20000" />
</bean>
<bean id="queueListenerContainer1"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="queueJmsConnectionFactory" />
<property name="destination" ref="QUEUE1" />
<property name="messageListener" ref="queueListener1" />
<property name="sessionAcknowledgeModeName" value="AUTO_ACKNOWLEDGE" />
<property name="receiveTimeout" value="20000" />
</bean>
<bean id="queueListenerContainer2"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="queueJmsConnectionFactory" />
<property name="destination" ref="QUEUE2" />
<property name="messageListener" ref="queueListener2" />
<property name="sessionAcknowledgeModeName" value="AUTO_ACKNOWLEDGE" />
<property name="receiveTimeout" value="20000" />
</bean>
</beans>
相应的QueueConsumer类代码(自定义的)
package com.activemq.consumer;
import org.apache.log4j.Logger;
import com.activemq.common.FooMessage;
/**
* Date: 2008-8-28
* Time: 17:10:34
*/
public class QueueConsumer {
int i;
Logger logger=Logger.getLogger(QueueConsumer.class);
public void receive(FooMessage message) {
i++;
System.out.println("*************************************** Queue : " + message.getId()+","+System.currentTimeMillis());
logger.debug("Queue: " + message.getId()+"-------count"+i+"------------------------------------------------>"+System.currentTimeMillis());
/*try {
Thread.sleep(60000);
} catch (InterruptedException e) {
logger.error("time out..............................................");
e.printStackTrace();
}*/
}
}
ok接着就可以测试了,测试类:
package com.activemq.consumer;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class ConsumerTestMain {
public static void main(String[] args) {
BeanFactory bf = new ClassPathXmlApplicationContext("classpath:activemq-consumer.xml");
}
}
查看消息的发送情况
启动activemq服务器,打开http://localhost:8161/admin/选择QUEUE查看
你会发现我们启动了1个消费者,再次运行测试类消费者变为2,然后群殴们启动生成者
生产者的相关配置
activemq-producer.xml
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.apache.org/schema/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core.xsd">
<bean id="queueJmsConnectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616?wireFormat.maxInactivityDuration=0"/>
<property name="useAsyncSend" value="true"/>
</bean>
<!-- ActiveMQ destinations -->
<!-- 使用Queue方式-->
<amq:queue name="QUEUE0" physicalName="JMS-TEST-QUEUE0" />
<amq:queue name="QUEUE1" physicalName="JMS-TEST-QUEUE1" />
<amq:queue name="QUEUE2" physicalName="JMS-TEST-QUEUE2" />
<!-- Spring JmsTemplate config -->
<bean id="queueJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory">
<!-- lets wrap in a pool to avoid creating a connection per send -->
<bean class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory" ref="queueJmsConnectionFactory" />
</bean>
</property>
<!-- custom MessageConverter -->
<property name="messageConverter" ref="defaultMessageConverter" />
<property name="receiveTimeout" value="20000"/>
</bean>
<!-- converter -->
<bean id="defaultMessageConverter" class="com.activemq.common.DefaultMessageConverter" />
<bean id="queueMessageProducer0" class="com.activemq.producer.QueueMessageProducer">
<property name="template" ref="queueJmsTemplate" />
<property name="destination" ref="QUEUE0" />
</bean>
<bean id="queueMessageProducer1" class="com.activemq.producer.QueueMessageProducer">
<property name="template" ref="queueJmsTemplate" />
<property name="destination" ref="QUEUE1" />
</bean>
<bean id="queueMessageProducer2" class="com.activemq.producer.QueueMessageProducer">
<property name="template" ref="queueJmsTemplate" />
<property name="destination" ref="QUEUE2" />
</bean>
</beans>
生成者类:QueueMessageProducer
package com.activemq.producer;
import javax.jms.Queue;
import org.springframework.jms.core.JmsTemplate;
import com.activemq.common.FooMessage;
public class QueueMessageProducer extends Thread {
private Queue destination;
private String qmid;
private JmsTemplate template;
public Queue getDestination() {
return destination;
}
public String getQmid() {
return qmid;
}
public JmsTemplate getTemplate() {
return template;
}
@Override
public void run() {
for (int i = 0; i < 1000000; i++) {
FooMessage fMessage = new FooMessage();
fMessage.setId(this.getQmid() + "-" + i);
this.send(fMessage);
}
}
public void send(FooMessage message) {
this.getTemplate().convertAndSend(this.getDestination(), message);
}
public void setDestination(Queue destination) {
this.destination = destination;
}
public void setQmid(String qmid) {
this.qmid = qmid;
}
public void setTemplate(JmsTemplate template) {
this.template = template;
}
}
测试类:
package com.activemq.producer;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class ProducerTestMain {
public static void main(String[] args) {
BeanFactory bf = new ClassPathXmlApplicationContext("classpath:activemq-producer.xml");
QueueMessageProducer[] qms={
(QueueMessageProducer)bf.getBean("queueMessageProducer0"),
(QueueMessageProducer)bf.getBean("queueMessageProducer1"),
(QueueMessageProducer)bf.getBean("queueMessageProducer2")};
for (int i = 0; i < qms.length; i++) {
qms[i].setQmid(i+"");
qms[i].start();
}
}
}
启动测试类把你的生产者类中的run方法的循环次数的数字写大些比如向我这里一样写成10万,对应的你启动的activemq服务器你就明白了
不明白的可以喝我沟通,我们共同学习,以防我们学习过程中的不足
另外我要向大家说下我的误解:
<!-- 使用Queue方式-->
19. <amq:queue name="QUEUE0" physicalName="JMS-TEST-QUEUE0" />
20. <amq:queue name="QUEUE1" physicalName="JMS-TEST-QUEUE1" />
21. <amq:queue name="QUEUE2" physicalName="JMS-TEST-QUEUE2" /> 一直以来我误以为这段代码我们创建了三个消费者其实是消费者的三个消费管道,消费者要通过这样的管道接受消费者发送的消息!
分享到:
相关推荐
ActiveMQ作为Apache软件基金会的一个项目,是一款开源的消息中间件,支持多种协议,如AMQP、STOMP、MQTT等,并且与Java平台紧密结合。Spring框架则是一个广泛使用的Java企业级应用开发框架,提供了一整套的解决方案...
在Spring应用中,可以通过声明式的方式创建消息生产者和消费者。例如,可以创建一个`MessageProducer`类来发送消息: ```java import org.springframework.beans.factory.annotation.Autowired; import org.spring...
在实际应用中,`activeMQ_p2s`和`activeMQ_spring`这两个文件可能包含了示例代码,分别展示了生产者(Producer)和消费者(Consumer)的实现。生产者通常会使用`JmsTemplate`发送消息,而消费者则通过实现`Message...
2. **Spring集成ActiveMQ**:Spring通过`spring-jms`模块提供对JMS的支持。在Spring配置文件中(如`applicationContext.xml`),你需要定义一个`JmsTemplate`,它提供了一种简单的方式来发送和接收消息。同时,配置`...
在实际项目中,ActiveMQ与Spring的结合可以用于解耦应用程序组件,提高系统的可扩展性和可靠性。通过异步处理,你可以构建响应更快的应用,同时避免阻塞主线程。此外,消息队列还可以实现负载均衡,让多个消费者共享...
6. **运行实例**:确保ActiveMQ服务器正在运行,然后启动你的Spring应用。调用`sendMessage()`方法,消息将被发送到指定的队列。 以上就是ActiveMQ与Spring整合的基本流程。在实际项目中,你可能还需要处理事务管理...
将ActiveMQ与Spring整合,可以方便地在Spring应用中使用消息队列。 **ActiveMQ整合Spring的核心知识点:** 1. **Spring JMS支持**: Spring提供了JMS模块,通过`org.springframework.jms`包中的类和接口,简化了...
将 ActiveMQ 与 Spring 结合,可以方便地在 Spring 应用中集成消息队列,实现高效的消息传递。 **1. 安装与配置 ActiveMQ** 首先,你需要下载并安装 ActiveMQ。在官方网站上可以找到最新版本的下载链接。安装完成...
ActiveMQ-Spring项目是将ActiveMQ集成到Spring应用程序中的桥梁,它使得在Spring应用中配置和使用ActiveMQ变得简单易行。这个`activemq-spring-1.2.jar`库正是这个集成的实现,它包含了一系列的Spring Bean定义和...
首先,`activemq-spring-1.5.jar`是专门为Spring框架设计的ActiveMQ集成库,它使得在Spring应用中配置和使用ActiveMQ变得异常简单。这个版本1.5的库是在Spring框架成熟时期发布,旨在提供与早期Spring版本的良好兼容...
`activemq-spring-2.0.jar`是专门为Spring框架设计的ActiveMQ客户端库,使得在Spring应用中集成ActiveMQ变得更加简单。该库包含了一系列Spring Bean定义,允许开发者通过XML配置或Java配置来声明式地创建和管理...
ActiveMQ 和 Spring 的整合是企业级应用中常见的消息中间件解决方案,尤其在构建分布式系统时,两者结合能够提供高效、可靠的异步通信能力。ActiveMQ 是一个开源的消息代理,实现了 JMS(Java Message Service)规范...
`activemq-spring-1.4.jar`是专门为Spring框架设计的一个适配器,使得Spring应用能够无缝地集成ActiveMQ。这个库包含了Spring与ActiveMQ交互所需的类和配置,使得开发者可以通过Spring的IoC(Inversion of Control)...
首先,`activemq-spring-1.2_001.jar`是ActiveMQ与Spring框架集成的库文件,它允许我们在Spring应用环境中无缝地使用ActiveMQ。这个版本号`1.2_001`表示这是该组件的特定迭代,可能包含了对早期版本的改进和修复。`...
这个"activeMQ+spring+springmvc整合示例"旨在帮助开发者理解如何将这三者结合,以实现消息队列(Message Queue)的功能,提高系统的异步处理能力和负载均衡。以下是关于这一整合的详细知识讲解: 1. **ActiveMQ**:...
标题中的“ActiveMQ与Spring...综上所述,这个主题深入讲解了如何在Java开发环境中利用Maven构建工具,结合Spring框架和ActiveMQ实现消息传递功能。通过理解这些知识点,开发者可以构建出更健壮、可扩展的分布式系统。
将ActiveMQ与Spring结合,可以实现更高效、可扩展的应用程序。本篇文章将深入探讨如何整合ActiveMQ与Spring,并利用连接池提升性能。 首先,我们需要理解JMS(Java Message Service),它是Java平台中用于异步消息...
在IT行业中,消息传递系统...这种集成使得开发人员能够在Spring应用中轻松地利用ActiveMQ的功能,构建出高可用、可扩展的分布式系统。在实际项目中,这种集成方法能够提高系统的可靠性和性能,同时也降低了开发复杂度。
将ActiveMQ与Spring结合,可以轻松地在Spring应用中集成消息队列,实现高效、解耦的系统通信。 在这个"ActiveMQ5.1+Spring2.5 Demo"中,我们将深入探讨如何在Spring 2.5版本中配置和使用ActiveMQ,以及如何构建一个...
通过这种集成,Spring应用可以利用ActiveMQ提供的消息传递功能,实现异步处理、负载均衡和容错机制。这有助于提高系统的响应速度和整体性能,同时保持良好的可维护性和可扩展性。在实际应用中,你可以根据需求创建多...