`
ll_feng
  • 浏览: 389863 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

activeMO学习笔记二:发布和订阅

    博客分类:
  • j2ee
 
阅读更多
其实学习activeMQ的初衷就是要找一个能够实现异步消息的发布/订阅机制的解决方案。
一、要启动消息代理
也就是一个broker.在上一篇中,我是自建了一个内嵌的broker. 经过进一步的了解,实际上利用官方的代理才是最常见的应用场景。
这里直接执行官方解压包下的activemq.bat
若是要启动特定配置的broker.先确认配置文件如activemq-demo.xml存在于conf下。执行:
./activemq xbean:activemq-demo.xml 


二、创建消息发布者
在一个java 工程中来创建发布者类
package test.mq;

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnectionFactory;

public class TopicProducer {

	    public static void main(String[] args) throws JMSException {  
	        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");  
	        Connection connection = factory.createConnection();  
	        connection.start();  
	          
	        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);  
	        Topic topic = session.createTopic("Topics.alert");  
	  
	        MessageProducer producer = session.createProducer(topic);  
	        producer.setTimeToLive(60000);	//设定消息过期时间
	        //producer.setDeliveryMode(DeliveryMode.PERSISTENT);	//设置消息的持久化为永不过期  
	        

	        int i= 1;
	        while(i<10) {  
	            TextMessage message = session.createTextMessage();  
	            message.setText("message_"+ i +" : "+ System.currentTimeMillis());  
	            producer.send(message);  
	            System.out.println("Sent message: " + message.getText());  
	            i++;
	            try {  
	                Thread.sleep(1000);  
	            } catch (InterruptedException e) {  
	                e.printStackTrace();  
	            }  
	        }  
	  
//	      session.close();  
//	      connection.stop();  
//	      connection.close();  
	    }   
}


三、创建消息订阅者
在一个java工程中创建订阅者类
package test.mq;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;

import org.apache.activemq.ActiveMQConnectionFactory;

public class TopicConsumer {
    public static void main(String[] args) throws JMSException {  
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");  
        Connection connection = factory.createConnection();  //factory.createTopicConnection();//
        //若是持久订阅,这里必须设置当前订阅者的id,并在第一次运行时注册到borker上
        connection.setClientID("zhangsan");	
        connection.start();  
          
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);  
        Topic topic = session.createTopic("Topics.alert");  
  
        //MessageConsumer consumer = session.createConsumer(topic);
        //如果是持久订阅,用下面的两行,替代上面的一行
        TopicSubscriber subscriber = session.createDurableSubscriber(topic, "zhangsan");
        subscriber.setMessageListener(new MessageListener() {
        //consumer.setMessageListener(new MessageListener() {  
            public void onMessage(Message message) {  
                TextMessage tm = (TextMessage) message;  
                try {  
                    System.out.println("Received message: " + tm.getText());  
                } catch (JMSException e) {  
                    e.printStackTrace();  
                }  
            }  
        });  
//      session.close();  
//      connection.stop();  
//      connection.close();  
    } 
}


四、测试
1、运行发布者类TopicProducer.java.在确认成功后。
2、运行订阅者类TppicConsumer.java.可以看到所有的消息都被接收过来了。
3、还可以修改发布者的主题(topic)后测试,发现订阅者接受到不任务消息。
4、还可以确认两个客户端的主题一致的前提下,在发布完成后,将发布者停止后,再启动订阅者。可以发现订阅都照样可以接受该主题的消息,这一切说明持久化的发布/订阅是成功了!

五、小结:
1、推荐使用官方的broker.
2、两个客户端(发布者和订阅者)都要设置相同的主题,即:
Topic topic = session.createTopic("Topics.alert"); //Topics.alert就是主题名

3、若是希望订阅者能够事后接收在它不活时所发布的消息,必须为订阅者设置ID,即:
connection.setClientID("zhangsan");	

这个id可以随意起名,会在订阅者第一次连接broker时,注册到代理服务器上。
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics