`
黎剑发
  • 浏览: 3357 次
  • 性别: Icon_minigender_1
  • 来自: 广州
社区版块
存档分类
最新评论

理解JMS规范中消息的传输模式和消息持久化

    博客分类:
  • JMS
 
阅读更多
原文  http://blog.csdn.net/aitangyong/article/details/26132913
      JMS规范定义了2种消息传输模式:持久传送模式和非持久传输模式。发送者可以通过如下类似的代码进行设置
TopicPublisher publihser = session.createPublisher(topic);


// 设置持久化传输
publihser.setDeliveryMode(DeliveryMode.PERSISTENT); 这种方式对publisher发送的所有消息都有效,相当于是一个全局的效果。如果只是想设置某一个消息的传输模式,可以通过以下代码设置消息头的属性来实现

TextMessage message = session.createTextMessage(text);
    
message.setJMSDeliveryMode(DeliveryMode.PERSISTENT);  
  

使用传输模式是一件很容易的事,直接调用API就可以了。那什么是传输模式呢?传输模式是用来控制消息属性的,DeliveryMode.PERSISTENT代表这是持久消息,DeliveryMode.NON_PERSISTENT代表是非持久消息。个人觉得传输模式和消息持久化是同一个概念,只不过是不同的叫法而已。
1.NON_PERSISTENT模式和 PERSISTENT模式
   对于非持久的消息,JMS provider不会将它存到文件/数据库等稳定的存储介质中。也就是说非持久消息驻留在内存中,如果jms provider宕机,那么内存中的非持久消息会丢失。A JMS provider must diliver a NON_PERSISTENT message at-most-once 。对于持久消息,消息提供者会使用存储-转发机制,先将消息存储到稳定介质中,等消息发送成功后再删除。如果jms provider挂掉了,那么这些未送达的消息不会丢失;jms provider恢复正常后,会重新读取这些消息,并传送给对应的消费者。 A JMS provider must diliver a PERSISTENT message once -and-only-once 。
2.消息是否持久和是否送达
    消息的持久特性就是为了在异常发生的时候保证消息的送达 。如果网络、jms provider、消息生产者、消息消费者都不会出现任何故障,那么持久消息和非持久消息就没有差别了。因为一旦消息成功传送给它的所有消费者,那么jms provider会从内存/硬盘上删除这些无用的消息。显然一切正常的情况下,使用PERSISTENT消息非常浪费, 因为持久传送消 息前,需要先将消息保存到硬盘;消息发送成功后,还需要将消息从硬盘上删除。 但现实情况是,网络可能出现断连、provider和消费者都有可能宕机。因此对于一些非常重要,不容许任何丢失的消息,一定要采用 PERSISTENT模式。
3.持久消息和持久订阅者
   我的另一篇博客   理解JMS规范中的持久订阅和非持久订阅    介绍了持久订阅者和非持久订阅者的差别。持久订阅者和持久消息有什么区别和联系吗?持久消息发送给持久订阅者和非持久订阅有什么差别?非持久消息能够发送给持久订阅者吗?下面通过一些测试代码,来阐述持久消息和持久订阅者的关系。测试代码是基于ActiveMQ5.8.0版本。

3.1生产者发送持久消息和非持久消息,但是消息没有消费者,即这是一条无用消息
package mq.aty.persistentmsg;

import javax.jms.DeliveryMode;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;

import mq.aty.JmsUtils;

/**
 * 直接运行该程序和activeMQ,不运行任何的消费者,然后观察持久化介质(我们使用了数据库)
 *
 */
public class NoReceiverTest
{
  private static TopicConnection connection = null;

  private static Topic topic = null;

  public static void main(String[] args) throws Exception
  {
    connection = JmsUtils.getConnection();
    topic = JmsUtils.getTopic();
    
    sentPersistent();
    sentNonPersistent();
    
    connection.close();
  }

  public static void sentPersistent() throws Exception
  {
    TopicSession session = connection.createTopicSession(false,
        Session.AUTO_ACKNOWLEDGE);

    TopicPublisher publihser = session.createPublisher(topic);

    publihser.setDeliveryMode(DeliveryMode.PERSISTENT);

    for (int i = 0; i < 3; i++)
    {
      String text = "I am persistent message.order=" + i;

      TextMessage message = session.createTextMessage(text);
      
      message.setJMSPriority(i);

      publihser.publish(message);
    }

  }
  
  public static void sentNonPersistent() throws Exception
  {
    TopicSession session = connection.createTopicSession(false,
        Session.AUTO_ACKNOWLEDGE);

    TopicPublisher publihser = session.createPublisher(topic);

    publihser.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

    for (int i = 0; i < 3; i++)
    {
      String text = "non-persistent message.id=" + i;

      TextMessage message = session.createTextMessage(text);

      publihser.publish(message);
    }

  }

} 


我使用了mysql数据库,并配置了activeMQ将消息持久化到数据库。运行上面的程序,发现mysql数据库中activemq_msgs表没有任何数据。可以证明:持久消息和非持久消息都被MQ消息服务器丢弃了。 无论是持久消息,还是非持久消息,如果消息没有对应的消费者,那么activeMQ会认为这些消息无用,直接删除。

3.2生产者发送持久消息和非持久消息,只有非持久订阅者
之前的博客已经介绍了:非持久订阅者只有在活动状态,并且和jms provider的保持连接情况下,才能收到消息。如果非持久订阅者挂掉了,那么不能再接收任何消息(无论是持久消息,还是非持久消息)。如果订阅者挂掉了,后续jms provider再收到消息,就变成了3.1的情况。也就是说:消息是否持久化,和非持久订阅者没有关系。

3.3持久消息和非持久消息,发送给离线的持久订阅者
消息的发送者源码:

package mq.aty.persistentmsg;

import javax.jms.DeliveryMode;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;

import mq.aty.JmsUtils;

/**
 * 直接运行该程序和activeMQ,没有任何的消费者,然后观察持久化介质(我们使用了数据库)
 *
 */
public class NoReceiverTest
{
  private static TopicConnection connection = null;

  private static Topic topic = null;

  public static void main(String[] args) throws Exception
  {
    connection = JmsUtils.getConnection();
    topic = JmsUtils.getTopic();
    
    sentPersistent();
    sentNonPersistent();
    
    connection.close();
  }

  public static void sentPersistent() throws Exception
  {
    TopicSession session = connection.createTopicSession(false,
        Session.AUTO_ACKNOWLEDGE);

    TopicPublisher publihser = session.createPublisher(topic);

    publihser.setDeliveryMode(DeliveryMode.PERSISTENT);

    for (int i = 0; i < 3; i++)
    {
      String text = "I am persistent message.order=" + i;

      TextMessage message = session.createTextMessage(text);
      
      message.setJMSPriority(i);

      publihser.publish(message);
    }

  }
  
  public static void sentNonPersistent() throws Exception
  {
    TopicSession session = connection.createTopicSession(false,
        Session.AUTO_ACKNOWLEDGE);

    TopicPublisher publihser = session.createPublisher(topic);

    publihser.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

    for (int i = 0; i < 3; i++)
    {
      String text = "non-persistent message.id=" + i;

      TextMessage message = session.createTextMessage(text);

      publihser.publish(message);
    }

  }

} 


持久订阅者源码如下:

package mq.aty.persistentmsg;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;

import mq.aty.JmsUtils;

/**
 * <pre>
 *  1、先运行监听者,向jms server注册,让jms server知道有这个持久订阅者。类似于你向腾讯申请个QQ号码
 *  
 *  2、启动jms server和持久订阅者(运行该类)。查看数据库可以发现activemq_acks中多了一条记录,
 *     也就是说activeMQ识别和接受了我们的持久订阅者
 *    
 *  3、停止持久订阅者,启动生产者向MQ服务器发送持久消息和非持久消息。发现activemq_msgs中多持久消息
 *  
 *  4、运行持久订阅者。发现持久消息和非持久消息都能接受到
 * </pre>
 * 
 */
public class DurableSubscriberTest
{

  public static void main(String[] args) throws Exception
  {
    TopicConnection connection = JmsUtils.getConnection();
    Topic topic = JmsUtils.getTopic();

    // 创建持久订阅的时候,必须要设置client,否则会报错:
    // javax.jms.JMSException: You cannot create a durable subscriber
    // without specifying a unique clientID on a Connection

    // 如果clientID重复(已经存在相同id的活动连接),会报错
    // javax.jms.InvalidClientIDException: Broker: localhost - Client: 1
    // already connected from tcp://127.0.0.1:2758
    connection.setClientID("1");

    TopicSession session = connection.createTopicSession(false,
        Session.AUTO_ACKNOWLEDGE);

    // 在同一个连接的ClientID下,持久订阅者的名称必须唯一
    // javax.jms.JMSException: Durable consumer is in use for client: 1 and
    // subscriptionName: 11

    // TopicSubscriber subscriber = session.createSubscriber(topic);
    TopicSubscriber subscriber = session.createDurableSubscriber(topic,
        "11");

    subscriber.setMessageListener(new MessageListener() {

      @Override
      public void onMessage(Message msg)
      {
        try
        {
          TextMessage textMsg = (TextMessage) msg;
          System.out.println("DurableSubscriber get:"
              + textMsg.getText());
        } catch (JMSException e)
        {
          e.printStackTrace();
        }
      }
    });

    connection.start();// 一定要start
  }
}


在第二步操作的时候,查看mysql数据库可以发现,数据库表activemq_acks中多了一条记录,记录我们的持久订阅者

在第三步操作的时候,查看数据库表activemq_msgs中多了3条持久消息。可以发现activeMQ会将持久消息保存到硬盘。

最后当我们重新启动持久订阅者的时候,可以发现,持久消息和非持久消息都能够接收到。这个时候 activemq_msgs中的消息被删除。


通过这种情况测试,只能看出持久订阅者和非持久订阅者存在差别: 持久订阅者能够接收离线消息,不管该消息是不是持久消息 。
我们好像还看出持久消息和非持久消息的区别,这是因为我们进行上述测试的时候, 没有关闭activeMQ服务器,所以无论是硬盘上的持久消息,还是内存中的非持久消息,都不会丢 。

接下来我们还是使用上面的发送者和接收者源码,但是改变下操作顺序。按照如下顺序进行操作:


* <pre>
*  1、先运行监听者,向jms server注册,让jms server知道有这个持久订阅者。类似于你向腾讯申请个QQ号码
*
*  2、启动jms server和持久订阅者(运行该类)。查看数据库可以发现activemq_acks中多了一条记录,
*     也就是说activeMQ识别和接受了我们的持久订阅者

*  3、停止持久订阅者,启动生产者向MQ服务器发送持久消息和非持久消息

*  4、消息发送成功后,停止activemq服务器、

*  5、重新启动mq服务器和订阅者。发现只能接收到持久消息
*
* </pre>
我们发现当activeMQ服务器挂掉再重启的时候,持久订阅者只能收到持久消息,不能收到非持久消息。

4.总结
   通过上述测试代码和执行结果,我们得出以下结论:
   持久订阅者/非持久订阅者,只影响离线的时候消息(包括持久消息和非持久消息)是否能接收到,和消息是否持久无关;
   持久消息/非持久消息,只是影响jms provider宕机后。消息是否会丢失,如果永远不会宕机,那么持久消息和非持久消息没有区别。
分享到:
评论

相关推荐

    JMS1.1 规范有中文和英文版本

    另一方面,"JMS规范.pdf"可能是中文版的翻译,方便不熟悉英文的开发者理解和使用JMS API。 总的来说,JMS1.1规范是Java开发者在构建可扩展、高可用和解耦的企业级系统时的重要参考。通过理解并熟练运用JMS,开发...

    ActiveMQ订阅模式持久化实现

    **ActiveMQ订阅模式持久化实现** ...开发者可以通过运行和调试这些代码来理解和实践ActiveMQ的订阅模式持久化功能。理解并正确应用这些知识点,可以确保消息的可靠传递,满足高可用性和容错性的需求。

    JMS规范教程.docx

    - **JMSDeliveryMode**:定义消息的传递方式,通常有两种模式:非持久化(Non-Persistent)和持久化(Persistent)。持久化消息会在消息代理重启后仍然保留,而非持久化消息则不然。 - **消息体**:实际的数据内容,...

    JMS标准规范培训教程

    《JMS规范培训教程》这份文档很可能是深入理解JMS标准和实际应用的指南,它可能涵盖了JMS的基本概念、API用法、消息模型的比较、事务处理、消息持久化、故障恢复策略等内容。通过学习这份教程,开发者能够熟练地在...

    JMS消息队列机制及案例

    在实际应用中,开发者可以结合ActiveMQ的特性和JMS规范,构建高效、可靠的分布式消息系统,解决系统间的同步问题,提升系统性能和稳定性。通过深入理解和实践,我们可以掌握如何利用JMS和ActiveMQ实现异步处理、系统...

    JMS 使用 ActiveMQ 传送文件

    3. **jms规范教程.pdf** - 这是一个关于JMS规范的教程,可能详细讲解了JMS接口、消息类型(如点对点和发布/订阅模型)、消息队列和主题等核心概念,为理解JMS和ActiveMQ的工作原理提供了基础。 综上所述,这个主题...

    jms规范说明

    ### JMS规范详解 #### 一、JMS基础概览 **1.1 何为JMS(Java Message Service)?** JMS,即Java消息服务,是Java平台中的...通过对JMS规范的深入理解,开发者能够更有效地利用其特性,实现高效、可靠的消息通信。

    JMS(java消息服务学习必备)

    JMS提供者是实现JMS规范的服务商,例如ActiveMQ、RabbitMQ、IBM WebSphere MQ等。它们负责管理和路由消息,并提供API供开发者使用。 **3. 消息类型** JMS支持两种主要的消息类型:点对点(Point-to-Point,P2P)和...

    java实现的基于jms协议的消息队列中间件,源码!

    3. **消息队列**:作为存储和转发的媒介,消息队列保证了消息的顺序,并在必要时进行消息的持久化,以防系统故障导致数据丢失。 4. **事务支持**:JMS提供了事务处理能力,确保消息的原子性,即消息要么全部发送成功...

    ActiveMQ在JMS中的运用-深入浅出JMS

    ActiveMQ是Apache软件基金会开发的一个开源消息代理,它实现了JMS规范,提供了高性能、可伸缩和可靠的异步消息传递。ActiveMQ支持多种协议,包括OpenWire、AMQP、STOMP、MQTT和WSO2等,使其能够与其他平台和语言进行...

    ActiveMQ 中javax.jms的源码 javax.jms-sources-1.1.zip

    总之,javax.jms-sources-1.1.zip提供的ActiveMQ源码是理解JMS规范和ActiveMQ实现的重要资源。通过对这些源码的分析,开发者可以提升自己的JMS编程技能,解决实际问题,构建更健壮的分布式系统。

    jms1.1.jar.zip

    1. 持久化:JMS支持消息的持久化存储,即使在系统崩溃或网络故障后,未被消费的消息仍能被恢复。 2. 事务处理:Session可以设置为事务模式,允许一组消息作为一个事务来发送或接收,确保数据的一致性。 3. 服务质量...

    JMS 1.1 API JAVADOC 中文版 chm

    JMS 1.1 API是这个规范的一个版本,提供了在分布式环境中发送和接收消息的机制,它是Java应用程序与其他系统进行交互的重要工具,尤其在处理大量并发和数据传输时。 JMS 1.1 API文档主要包含了以下关键概念和接口:...

    JMS.rar_answers_jms

    1. **消息和消息队列**:JMS中的核心概念是消息,它是一个数据结构,包含要传递的信息。消息通过消息队列进行传输,队列是一种先进先出(FIFO)的数据结构,确保消息按照顺序被消费。 2. **消息生产者和消费者**:...

    JMS所需jar包

    ActiveMQ还支持多种消息模式,如点对点(Queue)和发布/订阅(Topic),并且具有丰富的特性,如持久化、事务、网络传输和集群。 总的来说,"JMS所需jar包"和"apache-activemq-5.11.1"的组合提供了一个完整的JMS实现...

    ActiveMQ -5.9和jms-1.1源码下载

    5. **理解JMS规范**:通过JMS-1.1源码,开发者能更深入地理解消息传递的规范,确保应用符合最佳实践。 在阅读源码之前,建议先熟悉ActiveMQ和JMS的基本概念和用法,以便更好地理解代码逻辑。同时,配合官方文档和...

Global site tag (gtag.js) - Google Analytics