最近需要用到activemq的topic发布订阅功能
activemq可以到官网下载,下载完成后启动很简单,bin/activemq start
到时候可以访问管理控制台,密码是admin/admin,
http://192.168.91.128:8161/admin/topics.jsp, 128是我的部署activemq的ip地址,用的是redhat6.4
mq broker搭建好之后,我们首先需要开发publish程序,这里我们通过tomcat的jndi来完成,
新建一个web项目,然后META-INF里面创建context.xml,内容如下,
<Context antiJARLocking="true"> <Resource name="jms/ConnectionFactory" auth="Container" type="org.apache.activemq.ActiveMQConnectionFactory" description="JMS Connection Factory" factory="org.apache.activemq.jndi.JNDIReferenceFactory" brokerURL="tcp://192.168.91.128:61616" brokerName="LocalActiveMQBroker" useEmbeddedBroker="false"/> <Resource name="jms/topic/MyTopic" auth="Container" type="org.apache.activemq.command.ActiveMQTopic" factory="org.apache.activemq.jndi.JNDIReferenceFactory" physicalName="MY.TEST.FOO"/> </Context>
完成之后写个servlet来获取jnid中配置的factory和topic信息,代码如下
InitialContext initCtx = new InitialContext(); Context envContext = (Context) initCtx.lookup("java:comp/env"); ConnectionFactory connectionFactory = (ConnectionFactory) envContext.lookup("jms/ConnectionFactory"); Connection connection = connectionFactory.createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageProducer producer = session.createProducer((Destination) envContext.lookup("jms/topic/MyTopic")); Message testMessage = session.createMessage(); testMessage.setStringProperty("testKey", "testValue111"); producer.send(testMessage);
deploy应用之前,需要将activemq-all-5.9.0.jar放入到tomcat/lib下面,启动tomcat,接着部署刚才的 servlet应用,启动tomcat后topic不会被自动创建,然后可以访问这个servlet,会发现topic会被创建,可以观察控制台
接着是消费者subscriber,
下面是普通的main方法来消费topic
package com.activemqtest; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import javax.jms.TopicConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class Reciever { protected Topic queue; protected String queueName = "MY.TEST.FOO"; protected String url = "tcp://192.168.91.128:61616"; protected int ackMode = Session.AUTO_ACKNOWLEDGE; public static void main(String[] args) { Reciever rec = new Reciever(); try { rec.run(); } catch (Exception e) { e.printStackTrace(); } } public void run() throws JMSException { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory( url); TopicConnection connection = (TopicConnection) connectionFactory .createTopicConnection(); connection.start(); MessageConsumer consumer = null; Session session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); queue = session.createTopic(queueName); consumer = session.createConsumer(queue); System.out.println(" Waiting for message (max 5) "); for (int i = 0; i < 3; i++) { Message message = consumer.receive(); processMessage(message); } System.out.println("Closing connection"); consumer.close(); session.close(); connection.close(); } public void processMessage(Message message) { try { if (message instanceof TextMessage) { TextMessage textMessage = (TextMessage) message; String text = textMessage.getText(); System.out.println("Received: " + text); } else { System.out.println(message.getStringProperty("testKey")); } } catch (Exception e) { e.printStackTrace(); } } }
总结:
Topics
In JMS a Topic implements publish and subscribe semantics. When you publish a message it goes to all the subscribers who are interested - so zero to many subscribers will receive a copy of the message. Only subscribers who had an active subscription at the time the broker receives the message will get a copy of the message.
可以发现topic的消费者需要active,你不能等到publish完成之后,你启动subscriber,这样是接受不到消息的,但是queue的话不一样,不一定active才可以接受消息
另外有个问题是,tomcat只是一个servlet容器,并没有实现很多的j2ee标准比如jms,所以里面是没有javax.jms下面这些package的,需要单独下载J2ee.jar
相关推荐
在"apache-activemq-5.13.0-bin.tar.gz"这个压缩包中,包含了Apache ActiveMQ 5.13.0版本的所有相关文件,这将帮助我们搭建和运行一个完整的ActiveMQ服务器。 Apache ActiveMQ的核心功能包括: 1. **消息中间件**...
**Spring与ActiveMQ搭建JMS开发系统示例详解** 在Java世界中,消息队列(JMS,Java Message Service)是一种标准,它定义了API来创建、发送、接收和读取消息,允许应用程序进行异步通信。Spring框架是Java开发中的...
在Spring Boot应用中整合ActiveMQ和WebSocket,可以创建一个实时通信系统,使后端服务能够高效地推送消息到前端客户端。以下将详细解释这个过程的关键知识点: 1. **ActiveMQ**:Apache ActiveMQ是一个开源的消息...
- 开发者在实际项目中,可以根据需求选择Queue或Topic,编写相应的消息处理器,并调整ActiveMQ配置以适应生产环境。 9. **测试与部署**: - 在本地运行这个demo,首先确保安装并启动了ActiveMQ服务器,然后修改...
5. **消息队列和主题**:ActiveMQ提供两种消息模式:点对点(Queue)和发布/订阅(Topic)。点对点模式中,每个消息只有一个消费者,而发布/订阅模式下,一个消息可以被多个订阅者接收。 6. **消息过滤**:ActiveMQ...
2. **消息模式**:ActiveMQ支持多种消息模式,如点对点(Queue)、发布/订阅(Topic)、请求/响应(Request/Reply)等,以满足不同应用场景的需求。 3. **高级路由和过滤**:ActiveMQ提供了多种消息路由策略,如...
本教程将深入探讨如何利用Apache ActiveMQ搭建、配置和集成到Spring框架中,以实现高效的数据通信。 首先,我们需要了解JMS的基本概念。Java消息服务(JMS)是Java平台上的一个标准接口,它定义了应用程序如何创建...
在本文中,我们将深入探讨如何通过 Maven 搭建 ActiveMQ 应用,并逐步理解入门示例代码 `activemqDemo01`。 ### 一、Maven 搭建环境 在开始之前,确保你的开发环境中已经安装了 Java 和 Maven。Maven 是一个项目...
在这个"ActiveMQ简单Demo案例"中,我们将探讨如何使用ActiveMQ搭建服务器,并创建生成者(Producer)和消费者(Consumer)对象。 首先,我们需要理解JMS的概念。JMS是一个标准,定义了与消息传递系统交互的API,...
SpringBoot是Spring框架的简化版,旨在简化Spring应用的初始搭建以及开发过程。它通过内嵌的Tomcat服务器和自动配置功能,使得开发者能够快速创建独立运行的、生产级别的Java应用。 接下来是ActiveMQ,它是Apache...
4. 路由与过滤:ActiveMQ允许消息路由到特定的消费者,支持主题(Topic)和队列(Queue)两种模式,以及基于消息属性的过滤。 5. 安全性:支持用户认证和授权,可以保护消息的安全传输。 在"activemq_basic.rar"中...
在IT行业中,Spring Boot是一个非常流行的微服务框架,它简化了Spring应用的初始搭建以及开发过程。而ActiveMQ则是Apache出品的一款开源的消息中间件,它实现了多种消息协议,如OpenWire、AMQP、STOMP等,是Java消息...
同时,动手实践是提升技能的关键,尝试搭建一个简单的ActiveMQ环境,并编写一些示例应用来发送和接收消息。 总之,ActiveMQ作为一款强大的消息中间件,对于理解和实践分布式系统设计至关重要。通过深入学习和实践,...
11. **内存中的JMS提供者**:在单元测试场景下,ActiveMQ可以作为一个内存中的消息提供者,简化测试环境搭建。 ActiveMQ在实际工作中的作用主要包括: 1. **异步处理**:消息中间件使得应用可以将耗时操作异步化,...
- **Topic与Queue**:在JMS中,有两种消息模式,即主题(Topic)和队列(Queue)。在聊天室中,我们可能使用主题,因为每个消息都应广播给所有订阅者。 3. **ActiveMQ安装与配置**: - 下载并安装ActiveMQ服务器...
综上所述,`( apache-activemq-5.13.0-bin.zip )`是一个包含Apache ActiveMQ 5.13.0版本所有必要文件的压缩包,通过它,用户可以搭建起一个功能强大的消息中间件服务器,为分布式系统提供稳定的消息传递服务。...
Spring整合ActiveMQ是Java消息服务(JMS)在Spring框架中的应用,用于实现生产者与消费者的解耦。...通过理解上述步骤,开发者可以快速搭建起生产者和消费者的示例,进一步探索JMS在实际项目中的应用。
在IT行业中,Spring Boot是一个非常流行的Java开发框架,它简化了Spring应用的初始搭建以及开发过程。而ActiveMQ则是Apache出品的一款开源消息中间件,它实现了多种消息协议,如JMS(Java Message Service),用于在...