论坛首页 Java企业应用论坛

JMS规范介绍(1)JMS消息

浏览 2140 次
精华帖 (0) :: 良好帖 (1) :: 新手帖 (0) :: 隐藏帖 (0)
作者 正文
   发表时间:2011-08-23   最后修改:2011-08-23

[注]:本文与黄金档 上的文章同步,详见http://www.goldendoc.org/2011/08/jms_spec_message/

 

JMS ,即 Java Message Service ,它为 Java 应用程序提供了一种通用的用于创建、发送、接收以及读取消息的方式;

 

JMS 体系架构

1、 JMS Provider

面向消息中间件的, JMS 接口的一个实现。提供者可以是 Java 平台的 JMS 实现,也可以是非 Java 平台的面向消息中间件的适配器;

2、 JMS Client

生产或消费基于消息的 Java 的应用程序或对象;

3、 JMS Producer

创建并发送消息的 JMS Client

4、 JMS Consumer

接收消息的 JMS Client

5、 JMS Message

包括可以在 JMS Client 之间传递的数据的对象;

6、 JMS Queue

一个容纳那些被发送的等待阅读的消息的区域;
7 JMS Topic

一种支持发送消息给多个订阅者的机制;

 

JMS 消息

JMS 消息由以下三部分组成:

1 消息头 :所有消息的消息头都具体相同的字段,用于 JMS Client 以及 JMS Provider 对它们进行区别以及进行消息路由;

下面分别对几个重要的消息头字段及其作用和含义进行说明;

1) JMSDestination

消息发送的目的地(队列或主题);创建消息时可以设置 JMSDestination ,但是在发送完成时其值会更新为发送方所指定的 JMSDestination ,也就是说发送前该字段会被忽略;当消息被消费时,该字段的值与在它被发送时被设置的值是相同的;

如下面的例子(文中的例子都是基于 Apache Active MQ ):

 

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建2个目的地
Destination destination = session.createQueue("JMS.DEMO");
Destination destination2 = session.createQueue("JMS.DEMO2");

// 创建生产者
MessageProducer publisher = session.createProducer(destination);

// 设置传输模式
publisher.setDeliveryMode(DeliveryMode.PERSISTENT);

// 创建消息
TextMessage message = session.createTextMessage("Test Message");
// 设置消息的目的地为destination2
message.setJMSDestination(destination2);

// 发送消息
publisher.send(message);

System.out.println(message.getJMSDestination());
 

代码中,通过 message.setJMSDestination(destination2); 设置了 message JMSDestination 消息头属性值,我们再看看其输出结果:

 

queue://JMS.DEMO
 

通过这个例子可以看出,虽然在发送前设置了消息的目的地,但是发送后消息的目的地被重置了;

 

2) JMSDeliveryMode

指明消息的传输模式,有两种:

DeliveryMode.PERSISTENT :保证消息仅传一次, JMS Provider 服务停止后消息不会丢失;

DeliveryMode.NON_PERSISTENT :消息最多传一次,消息会因 JMS Provider 停止后丢失;

JMSDestination 一样,在发送前设置的会被忽略;

看下面的例子:

 

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建目的地
Destination destination = session.createQueue("JMS.DEMO");

// 创建生产者
MessageProducer publisher = session.createProducer(destination);

// 设置传输模式
publisher.setDeliveryMode(DeliveryMode.PERSISTENT);
// 发送PERSISTENT消息
publisher.send(session.createTextMessage("PERSISTENT MESSAGE"));

// 设置传输模式
publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
// 发送PERSISTENT消息
publisher.send(session.createTextMessage("NON_PERSISTENT MESSAGE"));
 

例子中分别发送了一条 PERSISTENT 的消息和一条 NON_PERSISTENT 的消息;当 Active MQ 重启后,启动消费端,收到的消息如下:

 

PERSISTENT MESSAGE
 

该例子说明,在 JMS Provider 重启后, NON_PERSISTENT 消息丢失了,而 PERSISTENT 消息能正常被消费者消费;

 

3) JMSMessageID

JMS Provider 指定的消息的唯一标识符;同上面的字段一样,在发送前设置的会被忽略,在发送完成时,由 JMS Provider 重置该字段;

 

4) JMSReplyTo

发送端在发送消息时,可以指定该属性(为一个 JMSDestination ),表示期望收到客户端的响应;是否响应由消费端决定;

如下面的例子:

发送端:

 

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建目的地
Destination destination = session.createQueue("JMS.DEMO");
Destination destination2 = session.createQueue("JMS.DEMO3");

// 创建生产者
MessageProducer publisher = session.createProducer(destination);

// 设置传输模式
publisher.setDeliveryMode(DeliveryMode.PERSISTENT);

// 创建消息
TextMessage message = session.createTextMessage("Test Message");
message.setJMSReplyTo(destination2);
// 发送消息
publisher.send(message);
 

接收端(可以根据情况决定是否需要回复):

 

public void onMessage(Message message) {
    try {
        System.out.println("Receive message: " + message);
        if (message.getJMSReplyTo() != null) {
            session.createProducer(message.getJMSReplyTo()).send(session.createTextMessage("This is a reply to"
                                                                                           + message.getJMSReplyTo()));
        }
    } catch (Exception e) {
        e.printStackTrace();
    }
}
 

5) JMSRedelivered

当消费者收到带有 JMSRedelivered 的消息头时,表明该消息在过去传输过但没有被确认;

JMS Provider 必须对该字段进行设置,当为 true 时即告知消费者该消息是重传的,消费者需要自行处理重复的消息;

 

6) JMSExpiration

消息的过期时间,其值为当前时间加上存活时间(毫秒);当存活时间设置为 0 时,该字段的值也被设置为 0 ,表示永不过期;

消费端在一般情况下都不会接收到过期的消息,但 JMS Provider 并不保证这一点;

下面的例子说明了如何设置消息的过期时间:

 

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建目的地
Destination destination = session.createQueue("JMS.DEMO");

// 创建生产者
MessageProducer publisher = session.createProducer(destination);

// 设置传输模式
publisher.setDeliveryMode(DeliveryMode.PERSISTENT);

// 创建消息
TextMessage message = session.createTextMessage("Test Message");
// 发送消息
publisher.setTimeToLive(5000);
publisher.send(message);
 

7) JMSPriority

消息的优先级, 0 代表最低优先级, 9 代表最高优先级;

一般 0~4 为普通优先级, 5~9 为加快优先级;

JMS 规范里并没有要求 JMS Provider 严格按这个优先级来实现,但是尽可能实现加快优先级消息的传输在普通消息的前面;

JMSDestination 一样,该字段在发送前被忽略,在发送完成时重置;

 

2 消息属性 :除了前面提到的消息头以外, JMS 消息还提供了对“属性值对”的支持,以对消息头进行扩展;消息属性主要用于消息选择器 (message selector 详见下文 )

1) 属性名:

属性名必须服务消息选择器的命名规则;

2) 属性值:

可以是基本类型及其对象类型以及 Map List String

下面的例子中,消息带 HashMap 的属性:

 

 

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建目的地
Destination destination = session.createQueue("JMS.DEMO");

// 创建生产者
MessageProducer publisher = session.createProducer(destination);

// 设置传输模式
publisher.setDeliveryMode(DeliveryMode.PERSISTENT);

// 创建消息
TextMessage message = session.createTextMessage("Test Message");
// 发送消息
message.setObjectProperty("myProp", new HashMap() {

    {
        this.put("key1", "value1");
        this.put("key2", "value2");
    }
});
publisher.send(message);
 

3) 清除属性:

JMS 不能清除单个属性,但可以通过 Message.clearProperties() 方法清除所有消息属性;

 

3 消息体 JMS 提供了 5 种类型的消息体:

1) StreamMessage :消息体是 Java 流,写入和读出都是顺序的;

2) MapMessage :消息体包含 key-value 对, key String value 为基本类型,可以通过迭代器访问;

3) TextMessage :消息体是 String

4) ObjectMessage :消息体是可序列化的 Java 对象;

5) BytesMessage :消息体是字节数组;

 

可以通过 message.clearBody() 来清除消息体;但在消费端,消息体是只读的,针对消息的写操作都会抛出 MessageNotWritableException 异常;

论坛首页 Java企业应用版

跳转论坛:
Global site tag (gtag.js) - Google Analytics