-
ACTIVEMQ Topic消息 生产者 发布消息后 消费者收不到消息5
生产者 public class Producer { public static void main(String[] args) throws JMSException { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); Connection connection = factory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); ActiveMQTopic topic= new ActiveMQTopic("testTopic"); MessageProducer producer = session.createProducer(topic); producer.setDeliveryMode(DeliveryMode.PERSISTENT); for(int i=0; i<10; i++){ TextMessage message = session.createTextMessage(); message.setText("message_" + System.currentTimeMillis()); producer.send(message); System.out.println("Sent message: " + message.getText()); } // session.close(); // connection.stop(); // connection.close(); } } 发布消息的结果 Sent message: message_1341915173083 Sent message: message_1341915173085 Sent message: message_1341915173085 Sent message: message_1341915173086 Sent message: message_1341915173086 Sent message: message_1341915173086 Sent message: message_1341915173087 Sent message: message_1341915173087 Sent message: message_1341915173088 Sent message: message_1341915173088 消费者 public class Consumer { public static void main(String[] args) throws JMSException { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); Connection connection = factory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); ActiveMQTopic topic= new ActiveMQTopic("testTopic"); // javax.jms.Topic topic = session.createTopic("myTopic.messages"); MessageConsumer consumer = session.createConsumer( topic); consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { TextMessage tm = (TextMessage) message; try { System.out.println("Received message: " + tm.getText()); } catch (JMSException e) { e.printStackTrace(); } } }); MessageConsumer comsumer2 = session.createConsumer(topic); comsumer2.setMessageListener(new MessageListener(){ public void onMessage(Message message) { TextMessage tm = (TextMessage) message; try { System.out.println("Received message: " + tm.getText()); } catch (JMSException e) { e.printStackTrace(); } } }); // session.close(); // connection.stop(); // connection.close(); } } 消费者运行程序后后获取不到生产者发布的消息,初识 ActiveMQ不太熟悉, 求解答
2012年7月10日 18:17
1个答案 按时间排序 按投票排序
-
采纳的答案
亲测 搞了个小例子 我的例子代码来自:
http://www.open-open.com/lib/view/open1328079945062.html
然后跑了一下 消费者能接到消息
ActiveMQTopic topic= new ActiveMQTopic("testTopic"); 怀疑你这个代码原因
觉得应该用这种方式
Topic topic = session.createTopic("testTopic");
另外我测试过程我写了个小博客 我的博客地址:http://babydeed.iteye.com/blog/1584561 里面含有我的代码2012年7月10日 21:38
相关推荐
- **启动 Topic 消息消费者**:运行 `cd example` 后执行 `ant topic-listener`。 - **启动 Topic 消息生产者**:运行 `cd example` 后执行 `ant topic-publisher`。此示例重复 10 轮,每轮发送 2000 条消息,并...
2. **创建消息生产者**:在Spring中,你可以使用`JmsTemplate`作为消息生产者,发送消息到ActiveMQ的队列或主题。配置`JmsTemplate`并设置ActiveMQ的连接工厂,然后在需要发送消息的地方调用其`convertAndSend`方法...
4. 定义消息消费者:创建一个监听特定主题的MessageListener。当有消息到达时,这个监听器会被触发并处理消息。 5. 编写消息发送接口:为了将消息发送逻辑封装起来,可以创建一个服务类,提供发送消息的方法,这些...
在发布/订阅模式中,消息生产者发布消息到主题(Topic),多个消费者可以订阅该主题,接收消息。 2. ActiveMQ的存储机制和故障处理 ActiveMQ的存储机制包括非持久化消息和持久化消息两种方式。非持久化消息存储在...
首先,`activemq-cpp`库为开发者提供了一个直观的API,用于创建生产者(Producer)和消费者(Consumer)来发送和接收消息。发送消息的基本步骤包括: 1. **初始化连接**:创建一个`ConnectionFactory`实例,然后...
- demo项目中应该包含生产者和消费者的相关代码,演示如何创建消息、发送到Queue或Topic,以及如何接收和处理消息。 - 可能包含`@JmsListener`注解用于定义消息监听器,以及`JmsTemplate`用于发送消息。 8. **...
6. **创建消费者**:创建一个消息消费者,用于接收消息。 7. **发送和接收消息**:在生产者上调用`send()`方法发送消息,消费者通过回调接口或者拉取模式接收消息。 8. **关闭资源**:在完成操作后,记得关闭连接、...
如果消费者在接收到消息后确认接收,ActiveMQ会将其从队列中删除。这种模式适用于需要可靠、顺序处理消息的场景。 2. **发布/订阅(Publish/Subscribe)模式**: 发布/订阅模式中,消息从生产者发送到主题,而不是...
- **Topic**:发布/订阅模式,多个消费者可以订阅同一个主题,接收到所有发布的消息。 5. **ActiveMQ的消息持久化方式有哪些?** ActiveMQ提供两种持久化方式:内存持久化和文件系统持久化。内存持久化速度快,但...
5. **创建消息消费者**:消费者可以是监听器容器(如DefaultMessageListenerContainer)或者基于回调的MessageListener。监听器容器会在后台自动创建连接,监听消息,并将接收到的消息分发给相应的MessageListener。...
- **单向性**:一个消息生产者(Producer)可以将消息发送到队列中,而这些消息最终会被一个消息消费者(Consumer)接收。 - **消息消费**:每条消息只会被一个消费者接收,一旦被接收后,消息将从队列中移除。 - **...
消费者可以设置为持久订阅,即使在消息到达时消费者不在线,也能在恢复连接后接收到未消费的消息。 3. 代理(Broker):ActiveMQ服务器作为消息代理,负责存储、路由和传递消息,确保消息的安全性和可靠性。 三、...
4. **创建消费者**:现在我们可以使用会话创建一个消息消费者。消费者用于接收从队列或主题发送的消息: ```java MessageConsumer consumer = session.createConsumer(destination); ``` 5. **监听消息**:为了...
1. 广播消息:生产者发布消息到一个主题,所有订阅了该主题的消费者都能接收到消息。 2. 多对多交付:一个消息可以被多个消费者同时接收,实现了广播效果。 3. 动态订阅:消费者可以随时订阅或取消订阅主题,不影响...
- **Topic**:多播模式,消息可以被多个消费者同时接收。 3. **消息模型**:ActiveMQ支持两种消息模型——点对点(Queue)和发布/订阅(Topic)。点对点模型适用于一对一通信,而发布/订阅模型适用于一对多广播式...
当一个消费者成功处理消息后,该消息会被从队列中移除。 **发布/订阅模式(Pub/Sub)** 在发布/订阅模式中,消息被广播给多个订阅者。这种模式中的消息容器称为主题(Topic)。每个订阅者都可以接收到所有发送到...
消费者则需要实现MessageListener接口,并在onMessage()方法中处理消息,创建连接,并基于该连接创建会话,设置消息监听器,最后创建消息消费者。 ActiveMQ的管理工具和API提供了丰富的功能来处理消息的传输、存储...
在这个“activeMQ消息中间件入门示例”中,我们将探讨如何设置基本的生产者和消费者来实现消息传递。 首先,让我们了解什么是消息中间件。消息中间件是一种软件系统,它允许不同的应用之间通过消息进行通信,而不是...
6. **消息消费者**(Message Consumer):由会话创建,用于接收来自目的地的消息。消费方式有同步(通过调用receive方法等待消息)和异步(通过消息监听器)。 7. **消息**(Message):包含消息头、属性和消息体。...
- 在此模型中,消息生产者发布消息到主题,而消息消费者订阅该主题以接收消息。 - 特点:消息可以被多个消费者订阅并消费;生产者与消费者存在时间上的关联,即消费者只能消费在其订阅之后发布的消息。 #### 三、...