`
cctvx1
  • 浏览: 79892 次
社区版块
存档分类
最新评论

ActiveMQ 实践之路(二) 使用Queue或者Topic发送/接受消息

阅读更多

本篇主要讲解在未使用其他框架(Spring)整合情况下,独立基于ActiveMQ,使用JMS规范进行消息通信。
    
     一.JMS回顾
       因为ActiveMQ是一个JMS Provider的实现,因此在开始实作前,有必要复习下JMS的基础知识
    Java Message Service (JMS)是sun提出来的为J2EE提供企业消息处理的一套规范,JMS目前有2套规范还在使用JMS 1.0.2b和1.1. 1.1已经成为主流的JMS Provider事实上的标准了.
      *1.1主要在session上面有一些重要改变,比如支持建立同一session上的transaction,让他支持同时发送P2P(Queue)消息和接受
Topic消息。
      
       在JMS中间主要定义了2种消息模式Point-to-Point (点对点),Publich/Subscribe Model (发布/订阅者),
    其中在Publich/Subscribe 模式下又有Nondurable subscription和durable subscription (持久化订阅)2种消息处理方式。
    
     下面是JMS规范基本的接口和实现
     JMS Common Interfacse PTP-Specific Interface   Pub/Sub-specific interfaces
     ConnectionFactory     QueueConnectionFactory   TopicConnectionFactory
     Connection            QueueConnection          TopicConnection
     Destination           Queue                    Topic
     Session               QueueSession             TopiSession
     MessageProducer       QueueSender              TopicPublisher
     MessageConsumer       QueueReceiver/QueueBrwer TopicSubscriber


     二.使用Queue

         下面以ActiveMQ example的代码为主进行说明
        
        1. 使用ActiveMQ的Connection,ConnectionFactory 建立连接,注意这里没有用到pool
       

java 代码
  1. import org.apache.activemq.ActiveMQConnection   
  2. import org.apache.activemq.ActiveMQConnectionFactory   

        //建立Connection

java 代码
  1. protected Connection createConnection() throws JMSException, Exception {   
  2.      ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, pwd, url);   
  3.      Connection connection = connectionFactory.createConnection();   
  4.      if (durable && clientID!=null) {   
  5.          connection.setClientID(clientID);   
  6.      }   
  7.      connection.start();   
  8.      return connection;   
  9.     }  

        //建立Session
  

java 代码
  1. protected Session createSession(Connection connection) throws Exception {   
  2.          Session session = connection.createSession(transacted, ackMode);   
  3.          return session;   
  4.         }   

        2。发送消息的代码
 //建立QueueSession
 

java 代码
  1. protected MessageProducer createProducer(Session session) throws JMSException {   
  2.         Destincation destination = session.createQueue("queue.hello");   
  3.         MessageProducer producer = session.createProducer(destination);   
  4.         producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);   
  5.            
  6.         if( timeToLive!=0 )   
  7.             producer.setTimeToLive(timeToLive);   
  8.         return producer;   
  9.         }   

         //使用Producer发送消息到Queue
    

java 代码
  1. producer.send(message);   

       
        3。接受消息,在JMS规范里面,你可以使用
  

java 代码
  1. QueueReceiver/QueueBrowser直接接受消息,但是更多的情况下我们采用消息通知方式,即实现MessageListener接口   
  2.  public void onMessage(Message message) {   
  3.  //process message   
  4.  }   
  5.           
  6.  //set MessageListner ,receive message   
  7.  Destincation destination = session.createQueue("queue.hello");   
  8.  consumer = session.createConsumer(destination);   
  9.  consumer.setMessageListener(this);   

       
        以上就是使用jms queue发送接受消息的基本方式

 
     三 Topic

        1. 建立连接
   

java 代码
  1. protected Connection createConnection() throws JMSException, Exception {      
  2.         ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, pwd, url);      
  3.         Connection connection = connectionFactory.createConnection();      
  4.         //如果你要使用DurableSubScription 方式,你必须为connection设置一个ClientID      
  5.         if (durable && clientID!=null) {      
  6.             connection.setClientID(clientID);      
  7.         }      
  8.         connection.start();      
  9.         return connection;      
  10.        }      

       2. 建立Session

java 代码
  1. protected Session createSession(Connection connection) throws Exception {      
  2.         Session session = connection.createSession(transacted, ackMode);      
  3.         return session;      
  4.         }    

 3.创建Producer 发送消息到Topic   
       

java 代码
  1. //create topic on  session   
  2.        topic = session.createTopic("topic.hello");   
  3.  producer = session.createProducer(topic);   
  4.        //send message    
  5.        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);   
  6.  producer.send(message);   


 4.创建Consumer接受消息(基本上和Queue相同)

java 代码
  1. Destincation destination  = session.createTopic("topic.hello");      
  2. MessageConsumer consumer = session.createConsumer(destination);      
  3. consumer.setMessageListener(this);      
  4.            
  5.      //如果你使用的是Durable Subscription方式,你必须在建立connection的时候      
  6.      //设置ClientID,而且建立comsumer的时候使用createDurableSubscriber方法,为他指定一个consumerName。      
  7.  //connection.setClientID(clientId);      
  8.  //consumer = session.createDurableSubscriber((Topic) destination, consumerName);   

       
 四:连接ActiveMQ的方式
        ActiveMQConnectionFactory 提供了多种连接到Broker的方式activemq.apache.org/uri-protocols.html

 常见的有
 vm://host:port     //vm
 tcp://host:port    //tcp
 ssl://host:port    //SSL
 stomp://host:port  //stomp协议可以跨语言,目前有很多种stomp client 库(java,c#,c/c++,ruby,python...);

分享到:
评论
2 楼 cctvx1 2007-02-27  
其实ActiveMQ现在已经和作为很多项目的异步消息通信核心了
包括Mule,ServiceMix,Geronimo,同时它又进一步为Spring2.0提供了支持,这些在后续的文章里面都会陆续提到。
1 楼 SteveGY 2007-02-27  
单独的MQ,用起来不太爽,如果和J2EE服务器一起使用,最好要一个RAR,但这类RAR还是比较麻烦的,虽然ActiveMQ提供了一个,但适用的AppServer都有限。
单独项目中使用带事务完整性支持的分布式队列,在工程上是有一定复杂度的,关键是事务完整性。

相关推荐

    Apache ActiveMQ Queue Topic 详解

    ### Apache ActiveMQ Queue & Topic 详解 #### 一、特性及优势 Apache ActiveMQ 是一款高性能、功能丰富的消息中间件,具有以下显著特点: 1. **实现 JMS 1.1 规范**:支持 J2EE 1.4 及以上版本,这意味着它可以...

    springboot2整合activemq的demo内含queue消息和topic消息

    - demo项目中应该包含生产者和消费者的相关代码,演示如何创建消息、发送到Queue或Topic,以及如何接收和处理消息。 - 可能包含`@JmsListener`注解用于定义消息监听器,以及`JmsTemplate`用于发送消息。 8. **...

    消息队列 Queue与Topic区别.docx

    ### 消息队列Queue与Topic的区别 #### 一、概念概述 消息队列(Message Queue)是一种应用程序间通信机制,允许程序之间通过发送和接收消息进行通信,而不必直接建立连接。它提供了异步处理机制,使得消息的发送者...

    一个jms activemq Topic 消息实例

    一个jms activemq Topic 消息实例 关于jms JMS 是接口,相当于jdbc ,要真正使用它需要某些厂商进行实现 ,即jms provider 常见的jms provider 有 ActiveMQ JBoss 社区所研发的 HornetQ (在jboss6 中默认即可以...

    用C#实现的ActiveMQ发布/订阅消息传送

    在IT行业中,消息队列(Message Queue)是一种重要的中间件技术,它允许应用程序之间通过异步通信进行数据交换。在本场景中,我们关注的是如何使用C#编程语言结合ActiveMQ来实现发布/订阅模式的消息传送。ActiveMQ是...

    Queue与Topic的比较

    如果消息到达指定 Topic 之后,JMS Provider 只会为已经连接并且订阅了该指定 Topic 的消息订阅者发送消息, 如果消息到达之后你恰好不在,那不好意思,你将接收不到这一消息。 Durable Subscription 是一种持久化...

    ActiveMQ实践入门指南_ActiveMQ实践入门指南_源码

    **ActiveMQ实践入门指南** Apache ActiveMQ是一款开源的消息中间件,它是Java消息服务(JMS)的实现,广泛应用于分布式系统中的异步通信。ActiveMQ以其高性能、高可靠性和易于管理的特点,在企业级应用中备受青睐。...

    spring集成activemq演示queue和topic 持久化

    return new ActiveMQQueue("myQueue"); } @Bean public Topic topic() { return new ActiveMQTopic("myTopic"); } @Bean public JmsListenerContainerFactory<?> queueListenerContainerFactory...

    ActiveMQ接受和发送工具.rar

    在压缩包中的"ActiveMQ接受和发送工具"很可能包含了一个图形界面或者命令行工具,使得用户可以更直观地发送测试消息到ActiveMQ服务器,查看消息队列的状态,以及接收消息。使用这些工具,开发者可以快速验证ActiveMQ...

    activeMQ发送消息返回消息

    6. **发送消息**:使用Producer和创建好的Message,我们可以调用Producer的send方法将消息发送到目的地,如Queue或Topic。 7. **接收消息**:创建MessageConsumer,它可以从Queue或Topic接收消息。你可以设置监听器...

    activemq 虚拟topic与路由功能

    2. **消息确认机制**:使用虚拟Topic时,需要确保消息的确认机制正确无误。例如,如果消息被复制到了多个目的地,那么应该明确指定哪些目的地需要返回确认。 3. **配置管理**:随着系统的扩展,虚拟Topic的配置可能...

    ActiveMQ队列消息过期时间设置和自动清除解决方案.docx

    在使用 ActiveMQ 时,消息过期时间设置和自动清除是一个非常重要的问题。本文将介绍 ActiveMQ 队列消息过期时间设置和自动清除的解决方案。 一、消息过期时间设置 在 ActiveMQ 中,可以通过设置时间戳插件来实现...

    spring使用activeMQ实现消息发送

    5. **队列和主题的区别**:在ActiveMQ中,消息可以发送到队列(Queue)或主题(Topic)。队列遵循一对一模型,每个消息仅被一个消费者接收;主题遵循一对多模型,一个消息可以被多个订阅者接收。在上述示例中,我们...

    activeMQ收发工具.rar

    3. **消息类型**:理解JMS提供的不同消息类型,如文本消息、对象消息、流消息和二进制消息,以及如何通过ActiveMQ收发工具发送和接收这些消息。 4. **队列与主题**:熟悉ActiveMQ中的队列(Queue)和主题(Topic)...

    ActiveMQ的queue和topic两种模式的示例演示参照.pdf

    在ActiveMQ中,有两种主要的消息传递模式:Queue和Topic。这两种模式都是基于JMS(Java Message Service)标准,用于在分布式环境中实现异步通信。理解它们的区别和应用场景至关重要。 **Queue(队列)模式** ...

    ActiveMQ的队列、topic模式

    本文将深入探讨ActiveMQ中的两种主要消息模式:队列(Queue)和主题(Topic)。 1. **队列(Queue)模式**: 队列模式遵循“发布/订阅”模型,但是一对一的。每个消息只能被一个消费者接收并处理。当一个消息被...

    ActiveMQ中Topic持久化Demo

    在分布式系统中,消息队列(Message Queue)作为解耦组件和异步处理的重要工具,Apache ActiveMQ 是一款广泛使用的开源消息中间件。本篇主要围绕"ActiveMQ中Topic持久化Demo"进行深入探讨,旨在帮助读者理解如何在...

    ActiveMQ Topic 实例

    ActiveMQ Topic与Queue的主要区别在于消息分发方式。Queue采用点对点模型,每个消息只能被一个消费者接收并删除;而Topic遵循发布/订阅模型,多个订阅者可以同时接收到相同的消息。Topic适用于广播式通信,例如股票...

    activemq-cpp发送接收消息,消息过滤器

    在本文中,我们将深入探讨如何使用`activemq-cpp`库在C++环境中发送和接收消息,并利用消息过滤器来实现特定的消息处理。`activemq-cpp`是Apache ActiveMQ的一个C++客户端,它提供了与ActiveMQ服务器进行交互的能力...

    MessageQueue API ActiveMQ Core 5_2_0_0-fuse API

    这个例子展示了如何使用ActiveMQ API创建连接、会话,创建并发送消息到队列,然后从队列中接收并打印消息。 总结来说,ActiveMQ Core 5.2.0.0-fuse API是实现JMS规范的一个强大工具,提供了丰富的功能和优秀的性能...

Global site tag (gtag.js) - Google Analytics