- 浏览: 125016 次
- 性别:
- 来自: 广州
文章分类
最新评论
-
xiaoyao3857:
...
openstack dashboard简体中文汉化 -
yuky1327:
你看看系统重启之后还正常不,我们研究的时候是win7重启之后 ...
VMWare上安装OpenStack -
dotapinkcat:
2. Network ConfigurationEdit th ...
VMWare上安装OpenStack -
daduedie:
嗯,不错~~~
VMWare上安装OpenStack
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.example.activemq</groupId> <artifactId>activemq-test</artifactId> <version>1.0-SNAPSHOT</version> <packaging>jar</packaging> <name>activemq-test</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <org.springframework.version>3.1.1.RELEASE</org.springframework.version> </properties> <repositories> <repository> <id>kxcomm-maven</id> <name>Maven kxcomm Repository</name> <url>http://122.13.0.56:8088/nexus/content/groups/public/</url> <releases> <enabled>true</enabled> </releases> <snapshots> <enabled>true</enabled> </snapshots> </repository> </repositories> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> <dependency> <groupId>commons-collections</groupId> <artifactId>commons-collections</artifactId> <version>3.2</version> </dependency> <dependency> <groupId>commons-configuration</groupId> <artifactId>commons-configuration</artifactId> <version>1.6</version> </dependency> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>1.3.2</version> </dependency> <dependency> <groupId>commons-logging</groupId> <artifactId>commons-logging</artifactId> <version>1.1.1</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency> <dependency> <groupId>commons-beanutils</groupId> <artifactId>commons-beanutils</artifactId> <version>1.8.3</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-asm</artifactId> <version>${org.springframework.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>${org.springframework.version}</version> </dependency> <dependency> <groupId>com.davidkarlsen.commonstransaction.spring</groupId> <artifactId>commons-transaction-spring</artifactId> <version>0.9</version> </dependency> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> <version>4.2-beta1</version> </dependency> <dependency> <groupId>fastutil</groupId> <artifactId>fastutil</artifactId> <version>5.0.9</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>0.9.27</version> <scope>compile</scope> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-core</artifactId> <version>0.9.27</version> <scope>compile</scope> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.6.1</version> <scope>compile</scope> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>jcl-over-slf4j</artifactId> <version>1.6.1</version> <scope>runtime</scope> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.5</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.8.0</version> </dependency> </dependencies> </project>
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-2.5.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-2.5.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd" default-autowire="byName" default-lazy-init="true"> <import resource="activemq-test.xml"/> </beans>
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd"> <!-- 创建工厂连接 --> <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://localhost:61616" /> </bean> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory" /> <property name="defaultDestination" ref="rantzDestination" /> </bean> <!-- Point-to-Point --> <!-- activeMQ消息目标 队列 --> <bean id="rantzDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg index="0" value="rantz.marketing.queue"></constructor-arg> </bean> <!-- activeMQ消息目标 主题--> <!-- <bean id="rantzDestination" class="org.apache.activemq.command.ActiveMQTopic">--> <!-- <constructor-arg index="0" value="rantz.marketing.queue"></constructor-arg>--> <!-- </bean>--> <bean id="producer" class="activemq.test.p2p.producer.RantzMarketingGatewayImpl"> <property name="jmsTemplate" ref="jmsTemplate" /> <property name="destination" ref="rantzDestination" /> </bean> <bean id="consumer" class="activemq.test.p2p.consumer.MarketingReceiverGatewayImpl"> <property name="jmsTemplate" ref="jmsTemplate" /> </bean> <!-- Point-to-Point End--> <!-- Topic --> <bean id="topic" class="org.apache.activemq.command.ActiveMQTopic" autowire="constructor"> <constructor-arg index="0" value="kxcomm.mms.topic" /> </bean> <bean id="control" class="org.apache.activemq.command.ActiveMQTopic" autowire="constructor"> <constructor-arg index="0" value="kxcomm.mms.control" /> </bean> <bean id="myListener" class="activemq.test.topic.MyListener"> <property name="connectionFactory" ref="connectionFactory" /> <property name="topic" ref="topic" /> <property name="control" ref="control" /> </bean> <bean id="myPublisher" class="activemq.test.topic.MyPublisher"> <property name="connectionFactory" ref="connectionFactory" /> <property name="topic" ref="topic" /> <property name="control" ref="control" /> </bean> <!-- Topic End--> </beans>
package activemq.test.model; import java.io.Serializable; public class User implements Serializable{ private static final long serialVersionUID = -3098636047897519268L; private String name; private String sex; private int age; public String getName() { return name; } public void setName(String name) { this.name = name; } public String getSex() { return sex; } public void setSex(String sex) { this.sex = sex; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } @Override public String toString() { return "User [name=" + name + ", sex=" + sex + ", age=" + age + "]"; } }
PTP模型
PTP(Point-to-Point)模型是基于队列的,生产者发消息到队列,消费者从队列接收消息,队列的存在使得消息的异步传输成为可能。和邮件系统中的邮箱一样,队列可以包含各种消息,JMS Provider 提供工具管理队列的创建、删除。JMS PTP 模型定义了客户端如何向队列发送消息,从队列接收消息,浏览队列中的消息。
package activemq.test.p2p.consumer; import org.springframework.jms.core.JmsTemplate; import activemq.test.model.User; public class MarketingReceiverGatewayImpl { private JmsTemplate jmsTemplate; public JmsTemplate getJmsTemplate() { return jmsTemplate; } public void setJmsTemplate(JmsTemplate jmsTemplate) { this.jmsTemplate = jmsTemplate; } public MarketingReceiverGatewayImpl() { } public void receiveMotorist() throws Exception{ User message = (User)jmsTemplate.receiveAndConvert(); System.out.println("reviced msg is:" + message.toString()); } }
package activemq.test.p2p.consumer; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; public class StartConsumer { public static void main(String[] args) { /*开始加载spring配置文件*/ ApplicationContext context = new ClassPathXmlApplicationContext("classpath:modules/applicationContext.xml"); MarketingReceiverGatewayImpl rantzMarketingGateway= (MarketingReceiverGatewayImpl) context.getBean("consumer"); System.out.println("Receive Start ..."); try { while(true){ rantzMarketingGateway.receiveMotorist(); } } catch (Exception e) { e.printStackTrace(); } } }
package activemq.test.p2p.producer; public interface IRantzMarketingGateway { /** * * 发送文本对象 * * @author zhangjh 新增日期:2013-9-20 * @since smsc-gateway */ public void sendMotoristInfo(); /** * * 发送对象 * * @author zhangjh 新增日期:2013-9-20 * @since smsc-gateway */ public void sendObjectInfo(); }
package activemq.test.p2p.producer; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import activemq.test.model.User; public class RantzMarketingGatewayImpl implements IRantzMarketingGateway { private JmsTemplate jmsTemplate; private Destination destination; public JmsTemplate getJmsTemplate() { return jmsTemplate; } public void setJmsTemplate(JmsTemplate jmsTemplate) { this.jmsTemplate = jmsTemplate; } public Destination getDestination() { return destination; } public void setDestination(Destination destination) { this.destination = destination; } public void sendMotoristInfo() { MessageCreator msg = new MessageCreator(){ public Message createMessage(Session session) throws JMSException { return session.createTextMessage("这是一个测试,"+System.currentTimeMillis()); } }; jmsTemplate.send(destination, msg); } public void sendObjectInfo() { User u = new User(); u.setAge(17); u.setName("yuky"+System.currentTimeMillis()); u.setSex("女"); jmsTemplate.convertAndSend(u); } }
package activemq.test.p2p.producer; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; public class StartProducer { public static void main(String[] args) { /*开始加载spring配置文件*/ ApplicationContext context = new ClassPathXmlApplicationContext("classpath:modules/applicationContext.xml"); IRantzMarketingGateway rantzMarketingGateway= (RantzMarketingGatewayImpl) context.getBean("producer"); for(int i=0;i<10;i++){ rantzMarketingGateway.sendObjectInfo(); System.out.println("Start ..."); } } }
PUB/SUB模型
消息订阅分为非持久订阅(non-durable subscription)和持久订阅(durable subscrip-tion),非持久订阅只有当客户端处于激活状态,也就是和JMS Provider 保持连接状态才能收到发送到某个主题的消息,而当客户端处于离线状态,这个时间段发到主题的消息将会丢失,永远不会收到。持久订阅时,客户端向JMS 注册一个识别自己身份的ID,当这个客户端处于离线时,JMS Provider 会为这个ID 保存所有发送到主题的消息,当客户再次连接到JMS Provider时,会根据自己的ID 得到所有当自己处于离线时发送到主题的消息。
package activemq.test.topic; import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.ObjectMessage; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import org.apache.activemq.ActiveMQConnectionFactory; import activemq.test.model.User; public class MyListener implements MessageListener { private ActiveMQConnectionFactory connectionFactory; private Connection connection; private Session session; private MessageProducer producer; private Topic topic; private Topic control; public Topic getTopic() { return topic; } public void setTopic(Topic topic) { this.topic = topic; } public Topic getControl() { return control; } public void setControl(Topic control) { this.control = control; } public ActiveMQConnectionFactory getConnectionFactory() { return connectionFactory; } public void setConnectionFactory(ActiveMQConnectionFactory connectionFactory) { this.connectionFactory = connectionFactory; } public void onMessage(Message message) { try{ if (checkText(message, "SHUTDOWN")) { try { connection.close(); System.out.println("退出监听消息"); } catch (Exception e) { e.printStackTrace(System.out); } } else if (checkText(message, "REPORT")) { // send a report: try { System.out.println("MyListener->收到 a report"); long time = System.currentTimeMillis(); String msg = "MyListener->返回 a report :" + time + "ms"; System.out.println(msg); producer.send(session.createTextMessage(msg)); } catch (Exception e) { e.printStackTrace(System.out); } } else { ObjectMessage obj = (ObjectMessage)message; User u = (User) obj.getObject(); System.out.println("Received messages."+ u.toString()); } }catch(Exception e){ } } public void run() throws JMSException { if(connectionFactory!=null){ System.out.println("connectionFactory is ok"); connection = connectionFactory.createConnection(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageConsumer consumer = session.createConsumer(topic); consumer.setMessageListener(this); connection.start(); producer = session.createProducer(control); System.out.println("Waiting for messages..."); } } private static boolean checkText(Message m, String s) { try { return m instanceof TextMessage && ((TextMessage)m).getText().equals(s); } catch (JMSException e) { e.printStackTrace(System.out); return false; } } }
package activemq.test.topic; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; public class StartListener { public static void main(String[] args) { /*开始加载spring配置文件*/ ApplicationContext context = new ClassPathXmlApplicationContext("classpath:modules/applicationContext.xml"); MyListener myListener= (MyListener) context.getBean("myListener"); try { if(myListener!=null){ System.out.println("success..."); } myListener.run(); } catch (Exception e) { e.printStackTrace(); } } }
package activemq.test.topic; import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.ObjectMessage; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import org.apache.activemq.ActiveMQConnectionFactory; import activemq.test.model.User; public class MyPublisher implements MessageListener { private ActiveMQConnectionFactory connectionFactory; private Connection connection; private Session session; private MessageProducer publisher; private Topic topic; private Topic control; private final Object mutex = new Object(); public ActiveMQConnectionFactory getConnectionFactory() { return connectionFactory; } public void setConnectionFactory(ActiveMQConnectionFactory connectionFactory) { this.connectionFactory = connectionFactory; } public Topic getTopic() { return topic; } public void setTopic(Topic topic) { this.topic = topic; } public Topic getControl() { return control; } public void setControl(Topic control) { this.control = control; } public void onMessage(Message message) { synchronized (mutex) { System.out.println("Received report " + getReport(message) ); } } Object getReport(Message m) { try { return ((TextMessage)m).getText(); } catch (JMSException e) { e.printStackTrace(System.out); return e.toString(); } } public void publish() throws Exception { User u = new User(); u.setAge(17); u.setName("yuky"+System.currentTimeMillis()); u.setSex("女"); // send events ObjectMessage obj = session.createObjectMessage(); obj.setObject(u); for (int i = 0; i < 10; i++) { publisher.send(obj); publisher.send(session.createTextMessage("REPORT")); } } public void run() throws Exception { connection = connectionFactory.createConnection(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); publisher = session.createProducer(topic); publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT); session.createConsumer(control).setMessageListener(this); connection.start(); } public void stop() throws JMSException{ publisher.send(session.createTextMessage("SHUTDOWN")); connection.stop(); connection.close(); } }
package activemq.test.topic; import javax.jms.JMSException; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; public class StartPublisher { public static void main(String[] args) throws InterruptedException { /*开始加载spring配置文件*/ ApplicationContext context = new ClassPathXmlApplicationContext("classpath:modules/applicationContext.xml"); MyPublisher publisher= (MyPublisher) context.getBean("myPublisher"); try { publisher.run(); publisher.publish(); } catch (Exception e) { try { publisher.stop(); } catch (JMSException e1) { e1.printStackTrace(); } e.printStackTrace(); } } }
相关推荐
MQ是消息中间件,是一种在分布式系统中应用程序借以传递消息的媒介,常用的有ActiveMQ,RabbitMQ,kafka。ActiveMQ是Apache下的开源项目,完全支持JMS1.1和J2EE1.4规范的JMS ...1、点对点(queue) 2、一对多(topic)
- `ActiveMQ-service.xml`和`ActiveMQ-client.xml`:这些XML配置文件用于定义ActiveMQ服务器和客户端的相关设置,包括连接工厂、目的地(Queue或Topic)、认证和网络配置等。 - `ActiveMQ.xml`:这是ActiveMQ...
- **队列与主题**:ActiveMQ支持两种消息模式——点对点(Queue)和发布/订阅(Topic)。队列保证消息的顺序传递,而主题则允许广播式的消息分发。 2. **ActiveMQ功能**: - **高可用性**:通过集群和复制策略,...
7. **消息模式**:ActiveMQ支持点对点(Queue)、发布/订阅(Topic)等多种消息模式,满足不同场景的需求。 8. **监控与调试**:通过Web控制台(默认端口8161)可以实时查看和管理消息队列,包括查看队列长度、消息...
- JMS提供了两种基本的消息模型:点对点(Queue)和发布/订阅(Topic)。 - 点对点模型中,每个消息仅由一个消费者接收,适合实现工作队列。 - 发布/订阅模型中,多个消费者可以订阅同一个主题,消息被广播给所有...
在本文中,我们将深入探讨如何使用Spring框架与Apache ActiveMQ集成,实现点对点通信(Point-to-Point)和发布/订阅(Publish/Subscribe)模式的通信。ActiveMQ是流行的开源消息中间件,它允许应用程序之间异步传输...
在 ActiveMQ 中,点对点(P2P)模式是基于队列(Queue)的一种通信方式。在这种模式下,消息生产者发送消息到队列,而消息消费者从队列中接收消息。队列具有先进先出(FIFO)的特性,即每个消息只能被一个消费者消费...
在"spring配置activemq详解"这个主题中,我们将探讨如何在Spring项目中配置和使用ActiveMQ。以下是对这个主题的详细说明: 1. **配置ActiveMQ**: - 首先,我们需要在项目中引入ActiveMQ的相关依赖,这通常通过在`...
队列采用点对点的方式传递消息,每个消息只能被一个消费者接收;主题则支持发布/订阅模式,多个订阅者可以接收相同的消息。 3. **JmsTemplate**: Spring提供的一个便捷工具类,用于发送和接收消息。通过它可以简化...
8. **Spring Boot与ActiveMQ**:在Spring Boot应用中,可以利用 starters 来简化配置,通过`spring.activemq.broker-url`和`spring.activemq.user`等属性快速设置ActiveMQ连接。 9. **消息确认**:ActiveMQ支持两种...
1. **ActiveMQ的基本概念**:包括Broker(消息代理)、Producer(生产者)、Consumer(消费者)、Topic(发布/订阅模型)、Queue(点对点模型)等基本元素。 2. **ActiveMQ的安装与配置**:讲解如何下载ActiveMQ,...
6. **消息传输**:Spring支持点对点(Queue)和发布/订阅(Topic)两种模式。Queue中,消息被一个消费者接收后,其他消费者无法再收到;而Topic中,消息可以被多个订阅者接收。 7. **事务管理**:Spring的JMS支持与...
本文将详细介绍如何在Spring Boot项目中集成ActiveMQ,以及利用其特性实现点对点和发布/订阅模式的消息通信。 首先,集成ActiveMQ到Spring Boot项目中,我们需要在`pom.xml`文件中添加ActiveMQ的依赖。对于Spring ...
它支持多种协议,如TCP/IP、HTTP、HTTPS等,并且可以处理多种消息类型,如点对点(Queue)、发布/订阅(Topic)模式。 3. **Spring与ActiveMQ整合**:整合Spring和ActiveMQ的主要目的是利用消息队列实现服务间的...
4. **定义Destination**:定义消息的目的地,可以是Queue(点对点)或Topic(发布/订阅模型)。 5. **配置MessageListenerContainer**:创建`DefaultMessageListenerContainer`,用于监听消息。可以设置并发消费者...
4. **配置Destination**:JMS中的目的地可以是Queue(点对点模型)或Topic(发布/订阅模型)。根据需求,配置相应的`Queue`或`Topic`,这将作为生产者发送消息和消费者接收消息的目标。 5. **配置MessageProducer和...
Queue支持点对点通信,而Topic支持发布/订阅模式。 3. **Message Listener Container**:Spring提供两种类型的消息监听容器,`DefaultMessageListenerContainer`和`SimpleMessageListenerContainer`,它们负责启动...
Apache ActiveMQ 是一个开源的消息中间件,它实现了Java消息服务(JMS)1.1规范,适用于J2EE 1.4及更高版本的环境。...无论是简单的点对点通信还是复杂的发布/订阅模式,开发者都能借助ActiveMQ轻松实现。
ActiveMQ 5是其一个版本,提供了多种协议支持,如TCP、SSL、NIO等,以及各种消息模式,如点对点(Point-to-Point)、发布/订阅(Publish/Subscribe)等。它能够确保消息的可靠传输,并允许在分布式环境中进行高效的...