0 0

ACTIVEMQ Topic消息 生产者 发布消息后 消费者收不到消息5

 

生产者

public class Producer {
	 public static void main(String[] args) throws JMSException {  
	        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");  
	        Connection connection = factory.createConnection();  
	        connection.start();  
	          
	        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);  
	        ActiveMQTopic topic= new ActiveMQTopic("testTopic");  
	  
	        MessageProducer producer = session.createProducer(topic);  
	       producer.setDeliveryMode(DeliveryMode.PERSISTENT);  
	  
	
	        for(int i=0; i<10; i++){
	            TextMessage message = session.createTextMessage();  
	           message.setText("message_" + System.currentTimeMillis());  
	           producer.send(message);  
	           System.out.println("Sent message: " + message.getText());  
	        }
	          
	       
	  
//	      session.close();  
//	      connection.stop();  
//	      connection.close();  
	    }  
}

发布消息的结果
Sent message: message_1341915173083
Sent message: message_1341915173085
Sent message: message_1341915173085
Sent message: message_1341915173086
Sent message: message_1341915173086
Sent message: message_1341915173086
Sent message: message_1341915173087
Sent message: message_1341915173087
Sent message: message_1341915173088
Sent message: message_1341915173088






消费者
public class Consumer {
	public static void main(String[] args) throws JMSException {  
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");  
        Connection connection = factory.createConnection();  
        connection.start();  
          
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
        ActiveMQTopic topic= new ActiveMQTopic("testTopic");
       // javax.jms.Topic topic =  session.createTopic("myTopic.messages");  
  
        MessageConsumer consumer = session.createConsumer( topic);  
        consumer.setMessageListener(new MessageListener() {  
            public void onMessage(Message message) {  
                TextMessage tm = (TextMessage) message;  
                try {  
                    System.out.println("Received message: " + tm.getText());  
                } catch (JMSException e) {  
                    e.printStackTrace();  
                }  
            }  
        });  
        
        
        
        MessageConsumer comsumer2 = session.createConsumer(topic);
        comsumer2.setMessageListener(new MessageListener(){
        	 public void onMessage(Message message) {  
                 TextMessage tm = (TextMessage) message;  
                 try {  
                     System.out.println("Received message: " + tm.getText());  
                 } catch (JMSException e) {  
                     e.printStackTrace();  
                 }  
             }  
        });
//      session.close();  
//      connection.stop();  
//      connection.close();  
    }  
}
消费者运行程序后后获取不到生产者发布的消息,初识 ActiveMQ不太熟悉,  求解答
 

 

2012年7月10日 18:17

1个答案 按时间排序 按投票排序

0 0

采纳的答案

亲测 搞了个小例子 我的例子代码来自:
http://www.open-open.com/lib/view/open1328079945062.html
然后跑了一下 消费者能接到消息

ActiveMQTopic topic= new ActiveMQTopic("testTopic");    怀疑你这个代码原因
觉得应该用这种方式
Topic topic = session.createTopic("testTopic");

另外我测试过程我写了个小博客 我的博客地址:http://babydeed.iteye.com/blog/1584561 里面含有我的代码

2012年7月10日 21:38

相关推荐

    Apache ActiveMQ Queue Topic 详解

    - **启动 Topic 消息消费者**:运行 `cd example` 后执行 `ant topic-listener`。 - **启动 Topic 消息生产者**:运行 `cd example` 后执行 `ant topic-publisher`。此示例重复 10 轮,每轮发送 2000 条消息,并...

    Spring+ActiveMQ消息队列+前台接收消息

    2. **创建消息生产者**:在Spring中,你可以使用`JmsTemplate`作为消息生产者,发送消息到ActiveMQ的队列或主题。配置`JmsTemplate`并设置ActiveMQ的连接工厂,然后在需要发送消息的地方调用其`convertAndSend`方法...

    SpringBoot+ActiveMq+MQTT实现消息的发送和接收

    4. 定义消息消费者:创建一个监听特定主题的MessageListener。当有消息到达时,这个监听器会被触发并处理消息。 5. 编写消息发送接口:为了将消息发送逻辑封装起来,可以创建一个服务类,提供发送消息的方法,这些...

    7道消息队列ActiveMQ面试题!

    在发布/订阅模式中,消息生产者发布消息到主题(Topic),多个消费者可以订阅该主题,接收消息。 2. ActiveMQ的存储机制和故障处理 ActiveMQ的存储机制包括非持久化消息和持久化消息两种方式。非持久化消息存储在...

    activemq-cpp发送接收消息,消息过滤器

    首先,`activemq-cpp`库为开发者提供了一个直观的API,用于创建生产者(Producer)和消费者(Consumer)来发送和接收消息。发送消息的基本步骤包括: 1. **初始化连接**:创建一个`ConnectionFactory`实例,然后...

    springboot2整合activemq的demo内含queue消息和topic消息

    - demo项目中应该包含生产者和消费者的相关代码,演示如何创建消息、发送到Queue或Topic,以及如何接收和处理消息。 - 可能包含`@JmsListener`注解用于定义消息监听器,以及`JmsTemplate`用于发送消息。 8. **...

    消息队列activeMQ

    6. **创建消费者**:创建一个消息消费者,用于接收消息。 7. **发送和接收消息**:在生产者上调用`send()`方法发送消息,消费者通过回调接口或者拉取模式接收消息。 8. **关闭资源**:在完成操作后,记得关闭连接、...

    activeMQ三种收发消息方式

    如果消费者在接收到消息后确认接收,ActiveMQ会将其从队列中删除。这种模式适用于需要可靠、顺序处理消息的场景。 2. **发布/订阅(Publish/Subscribe)模式**: 发布/订阅模式中,消息从生产者发送到主题,而不是...

    7道消息队列ActiveMQ面试题!.zip

    - **Topic**:发布/订阅模式,多个消费者可以订阅同一个主题,接收到所有发布的消息。 5. **ActiveMQ的消息持久化方式有哪些?** ActiveMQ提供两种持久化方式:内存持久化和文件系统持久化。内存持久化速度快,但...

    activemq_spring.rar_Spring和ActiveMQ_spring_消息中间件_消息发布订阅_消息订阅

    5. **创建消息消费者**:消费者可以是监听器容器(如DefaultMessageListenerContainer)或者基于回调的MessageListener。监听器容器会在后台自动创建连接,监听消息,并将接收到的消息分发给相应的MessageListener。...

    消息队列 Queue与Topic区别.docx

    - **单向性**:一个消息生产者(Producer)可以将消息发送到队列中,而这些消息最终会被一个消息消费者(Consumer)接收。 - **消息消费**:每条消息只会被一个消费者接收,一旦被接收后,消息将从队列中移除。 - **...

    activemq-example

    消费者可以设置为持久订阅,即使在消息到达时消费者不在线,也能在恢复连接后接收到未消费的消息。 3. 代理(Broker):ActiveMQ服务器作为消息代理,负责存储、路由和传递消息,确保消息的安全性和可靠性。 三、...

    动态创建ActiveMQ消费者

    4. **创建消费者**:现在我们可以使用会话创建一个消息消费者。消费者用于接收从队列或主题发送的消息: ```java MessageConsumer consumer = session.createConsumer(destination); ``` 5. **监听消息**:为了...

    activeMq点对点和发布/订阅模式demo

    1. 广播消息:生产者发布消息到一个主题,所有订阅了该主题的消费者都能接收到消息。 2. 多对多交付:一个消息可以被多个消费者同时接收,实现了广播效果。 3. 动态订阅:消费者可以随时订阅或取消订阅主题,不影响...

    ActiveMQ接受和发送工具.rar

    - **Topic**:多播模式,消息可以被多个消费者同时接收。 3. **消息模型**:ActiveMQ支持两种消息模型——点对点(Queue)和发布/订阅(Topic)。点对点模型适用于一对一通信,而发布/订阅模型适用于一对多广播式...

    ActiveMQ的简单例子

    当一个消费者成功处理消息后,该消息会被从队列中移除。 **发布/订阅模式(Pub/Sub)** 在发布/订阅模式中,消息被广播给多个订阅者。这种模式中的消息容器称为主题(Topic)。每个订阅者都可以接收到所有发送到...

    Apache_ActiveMQ教程

    消费者则需要实现MessageListener接口,并在onMessage()方法中处理消息,创建连接,并基于该连接创建会话,设置消息监听器,最后创建消息消费者。 ActiveMQ的管理工具和API提供了丰富的功能来处理消息的传输、存储...

    activeMQ消息中间件入门示例,行行注释

    在这个“activeMQ消息中间件入门示例”中,我们将探讨如何设置基本的生产者和消费者来实现消息传递。 首先,让我们了解什么是消息中间件。消息中间件是一种软件系统,它允许不同的应用之间通过消息进行通信,而不是...

    ActiveMQ技术研究要点

    6. **消息消费者**(Message Consumer):由会话创建,用于接收来自目的地的消息。消费方式有同步(通过调用receive方法等待消息)和异步(通过消息监听器)。 7. **消息**(Message):包含消息头、属性和消息体。...

    JMS_ActiveMQ交流学习

    - 在此模型中,消息生产者发布消息到主题,而消息消费者订阅该主题以接收消息。 - 特点:消息可以被多个消费者订阅并消费;生产者与消费者存在时间上的关联,即消费者只能消费在其订阅之后发布的消息。 #### 三、...

Global site tag (gtag.js) - Google Analytics