`
haoran_10
  • 浏览: 444136 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

消息中间件(1)-JMS规范

阅读更多

一、什么是JMS,为什么需要它

(1)、消息中间件的定义
       指利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信。消息中间件可以即支持同步方式,又支持异步方式。异步中间件比同步中间件具有更强的容错性,在系统故障时可以保证消息的正常传输。异步中间件技术又分为两类:广播方式和发布/订阅方式。由于发布/订阅方式可以指定哪种类型的用户可以接受哪种类型的消息,更加有针对性,事实上已成为异步中间件的非正式标准。
(2)、JMS定义
        从上个世纪90年代初,随着不同厂商消息中间件大量上市,消息中间件技术得到了长足的发展。目前,IBM和BEA的中间件产品在银行、证券、电信等高端行业,以及IT等行业中得到广泛应用。由于没有统一的规范和标准,基于消息中间件的应用不可移植,不同的消息中间件也不能互操作,这大大阻碍了消息中间件的发展。                   Java Message Service(JMS, Java消息服务)是SUN及其伙伴公司提出的旨在统一各种消息中间件系统接口的规范。它定义了一套通用的接口和相关语义,提供了诸如持久、验证和事务的消息服务,它最主要的目的是允许Java应用程序访问现有的消息中间件。JMS规范没有指定在消息节点间所使用的通讯底层协议,来保证应用开发人员不用与其细节打交道,一个特定的JMS实现可能提供基于TCP/IP、HTTP、UDP或者其它的协议。目前许多厂商采用并实现了JMS API,现在,JMS产品能够为企业提供一套完整的消息传递功能,下面是一些比较流行的JMS商业软件和开源产品。
(3)、解决了什么问题:

        采用异步通信模式:发送消息者可以在发送消息后进行其它的工作,不用等待接收者的回应,而接收者也不必在接到消息后立即对发送者的请求进行处理;

        客户和服务对象生命周期的松耦合关系:客户进程和服务对象进程不要求都正常运行,如果由于服务对象崩溃或者网络故障导致客户的请求不可达,客户不会接收到异常,消息中间件能保证消息不会丢失。

 

二、体系结构

整体结构如下:



(1)、ConnectionFactory

  连接工厂,JMS 用它创建连接Connection,一般设为单例模式,一旦创建,就一直运行在应用容器内

 

package javax.jms;
public interface ConnectionFactory {
    Connection createConnection() throws JMSException;//创建一个连接
    Connection createConnection(String userName, String password)//创建一个有密码的连接
        throws JMSException;
}

 

 

(2)、Connection

 

package javax.jms;
public interface Connection {
    Session createSession(boolean transacted, int acknowledgeMode)
        throws JMSException;
    String getClientID() throws JMSException; //唯一客户端ID
    void setClientID(String clientID) throws JMSException;
    ConnectionMetaData getMetaData() throws JMSException;
    ExceptionListener getExceptionListener() throws JMSException;
    void setExceptionListener(ExceptionListener listener) throws JMSException;
    void start() throws JMSException; //开启连接
    void stop() throws JMSException; //停止连接
    void close() throws JMSException; //关闭连接
    ConnectionConsumer createConnectionConsumer(
        Destination destination,
        String messageSelector,
        ServerSessionPool sessionPool,
        int maxMessages)
        throws JMSException;
    ConnectionConsumer createDurableConnectionConsumer(
        Topic topic,
        String subscriptionName,
        String messageSelector,
        ServerSessionPool sessionPool,
        int maxMessages)
        throws JMSException;
}

 JMS 客户端到JMS Provider 的连接,可以理解为数据库里的Connection

 

 

(3)、Session:一个发送或接收消息的线程

 

package javax.jms;
import java.io.Serializable;
public interface Session extends Runnable {
    static final int AUTO_ACKNOWLEDGE = 1; //四种模式
    static final int CLIENT_ACKNOWLEDGE = 2;
    static final int DUPS_OK_ACKNOWLEDGE = 3;
    static final int SESSION_TRANSACTED = 0;
    BytesMessage createBytesMessage() throws JMSException; //创建消息
    MapMessage createMapMessage() throws JMSException;
    Message createMessage() throws JMSException;
    ObjectMessage createObjectMessage() throws JMSException;
    ObjectMessage createObjectMessage(Serializable object) throws JMSException;
    StreamMessage createStreamMessage() throws JMSException;
    TextMessage createTextMessage() throws JMSException;
    TextMessage createTextMessage(String text) throws JMSException;
    boolean getTransacted() throws JMSException;
    int getAcknowledgeMode() throws JMSException;
    void commit() throws JMSException;
    void rollback() throws JMSException;
    void close() throws JMSException;
    void recover() throws JMSException;
    MessageListener getMessageListener() throws JMSException;
    void setMessageListener(MessageListener listener) throws JMSException;
    public void run();
    MessageProducer createProducer(Destination destination) //创建消息生产者
        throws JMSException;
    MessageConsumer createConsumer(Destination destination)
        throws JMSException;
    MessageConsumer createConsumer(
        Destination destination,
        java.lang.String messageSelector)
        throws JMSException;
    MessageConsumer createConsumer(
        Destination destination,
        java.lang.String messageSelector,
        boolean NoLocal)
        throws JMSException;
    Queue createQueue(String queueName) throws JMSException; //创建队列
    Topic createTopic(String topicName) throws JMSException; //创建主题
    TopicSubscriber createDurableSubscriber(Topic topic, String name)//创建主题订阅者
        throws JMSException;
    TopicSubscriber createDurableSubscriber(
        Topic topic,
        String name,
        String messageSelector,
        boolean noLocal)
        throws JMSException;
    QueueBrowser createBrowser(Queue queue) throws JMSException;
    QueueBrowser createBrowser(Queue queue, String messageSelector)
        throws JMSException;
    TemporaryQueue createTemporaryQueue() throws JMSException;
    TemporaryTopic createTemporaryTopic() throws JMSException;
    void unsubscribe(String name) throws JMSException;
}

 

所以,session具有创建消息,主题,队列,生产者,消费者功能。
(4)、MessageProducer Session 对象创建的用来发送消息的对象
(5)、Destination:消息的目的地,包括队列(PTP),主题(Pub/Sub
(6)、MessageConsumer Session 对象创建的用来接收消息的对象
(7)、Message:

JMS 消息由以下几部分组成:消息头,属性,消息体。

   消息头(header):JMS消息头包含了许多字段,它们是消息发送后由JMS提供者或消息发送者产生,用来表示消息、设置优先权和失效时间等等,并且为消息确定路由。

   属性(property):由消息发送者产生,用来添加删除消息头以外的附加信息。

  消息体(body):由消息发送者产生,JMS中定义了5种消息体:ByteMessage、MapMessage、ObjectMessage、StreamMessage和TextMessage。

 

三、两种消息传递模型

JMS Parent

PTP Domain

Pub/Sub Domain

ConnectionFactory

QueueConnectionFactory

TopicConnectionFactory

Connection

QueueConnection

TopicConnection

Destination

Queue

Topic

Session

QueueSession

TopicSession

MessageProducer

QueueSender

TopicPublisher

MessageConsumer

QueueReceiver

TopicSubscriber

(1)、点对点模型(PTP) 
        点对点模型用于消息生产者和消息消费者之间点到点的通信。消息生产者将消息发动到由某个名字标识的特定消费者。这个名字实际上对应于消息服务中的一个队列(Queue),在消息传动给消费者之前它被存储在这个队列中。队列可以是持久的,以保证在消息服务出现故障时仍然能够传递消息。
(2)、发布-订阅模型(Pub/Sub)
        发布-订阅模型用称为主题(topic)的内容分层结构代替了PTP模型中的惟一目的地,发送应用程序发布自己的消息,指出消息描述的是有关分层结构中的一个主题的信息。希望接收这些消息的应用程序订阅了这个主题。订阅包含子主题的分层结构中的主题的订阅者可以接收该主题和其子主题发表的所有消息
三、原生态JMS编程实践
 

广义上说,一个JMS应用是几个JMS 客户端交换消息,开发JMS客户端应用由以下几步构成

(1)、用JNDI 得到ConnectionFactory对象;

(2)、用JNDI 得到目标队列或主题对象,即Destination对象;

(3)、用ConnectionFactory创建Connection 对象;

(4)、用Connection对象创建一个或多个JMS Session;

(5)、用Session 和Destination 创建MessageProducer和MessageConsumer;

(6)、通知Connection 开始传递消息。

消费生产者
import java.io.*;
import javax.jms.*;
import javax.naming.*;
 
public class Sender {
 
    public static void main(String[] args) {
        new Sender().send();
    }
 
    public void send() {
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
        try {
            //Prompt for JNDI names
            System.out.println("Enter ConnectionFactory name:");
            String factoryName = reader.readLine();
            System.out.println("Enter Destination name:");
            String destinationName = reader.readLine();
 
            //Look up administered objects
            InitialContext initContext = new InitialContext();
            ConnectionFactory factory =
                (ConnectionFactory) initContext.lookup(factoryName);
            Destination destination = (Destination) initContext.lookup(destinationName);
            initContext.close();
 
            //Create JMS objects
            Connection connection = factory.createConnection();
            Session session =
                connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            MessageProducer sender = session.createProducer(queue);
 
            //Send messages
            String messageText = null;
            while (true) {
                System.out.println("Enter message to send or 'quit':");
                messageText = reader.readLine();
                if ("quit".equals(messageText))
                    break;
                TextMessage message = session.createTextMessage(messageText);
                sender.send(message);
            }
 
            //Exit
            System.out.println("Exiting...");
            reader.close();
            connection.close();
            System.out.println("Goodbye!");
 
        } catch (Exception e) {
            e.printStackTrace();
            System.exit(1);
        }
    }
}
消息消费者 
import java.io.*;
import javax.jms.*;
import javax.naming.*;
 
public class Receiver implements MessageListener {
    private boolean stop = false;
    public static void main(String[] args) {
        new Receiver().receive();
    }
 
    public void receive() {
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
        try {
            //Prompt for JNDI names
            System.out.println("Enter ConnectionFactory name:");
            String factoryName = reader.readLine();
            System.out.println("Enter Destination name:");
            String destinationName = reader.readLine();
            reader.close();
 
            //Look up administered objects
            InitialContext initContext = new InitialContext();
            ConnectionFactory factory =
                (ConnectionFactory) initContext.lookup(factoryName);
            Destination destination = (Destination) initContext.lookup(destinationName);
            initContext.close();
 
            //Create JMS objects
            Connection connection = factory.createConnection();
            Session session =
                connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            MessageConsumer receiver = session.createConsumer(queue);
            receiver.setMessageListener(this);
            connection.start();
 
            //Wait for stop
            while (!stop) {
                Thread.sleep(1000);
            }
 
            //Exit
            System.out.println("Exiting...");
            connection.close();
            System.out.println("Goodbye!");
        } catch (Exception e) {
            e.printStackTrace();
            System.exit(1);
        }
    }
 
    public void onMessage(Message message) {
        try {
            String msgText = ((TextMessage) message).getText();
            System.out.println(msgText);
            if ("stop".equals(msgText))
                stop = true;
        } catch (JMSException e) {
            e.printStackTrace();
            stop = true;
        }
    }
}
 
四、总结
目前许多厂商采用并实现了JMS API,比如Active MQ

Active MQ是一个基于Apcache 2.0 licenced发布,开放源码的JMS产品。其特点为:

(1)、提供点到点消息模式和发布/订阅消息模式;

(2)、支持JBoss、Geronimo等开源应用服务器,支持Spring框架的消息驱动;

(3)、新增了一个P2P传输层,可以用于创建可靠的P2P JMS网络连接;

(4)、拥有消息持久化、事务、集群支持等JMS基础设施服务。

下篇学习Active MQ

 

  • 大小: 32.1 KB
2
0
分享到:
评论

相关推荐

    java消息中间件教程-activemq

    - **3-1 JMS规范**:Java消息服务(Java Message Service, JMS)是Java平台中关于面向消息中间件(MOM)的标准客户端接口,它的主要目的是让Java应用程序能够与实现JMS的应用程序服务器通信。JMS定义了两种消息模型...

    消息中间件原理及JMS简介

    本文主要探讨了消息中间件的原理,以及Java消息服务(JMS)这一规范。 中间件是一种复用型软件,它位于操作系统、网络和数据库之间,为上层应用软件提供运行和开发的环境。中间件的主要任务是简化分布式系统的构建...

    消息中间件-- -activemq项目示例

    首先,ActiveMQ基于Java Message Service (JMS) 规范,提供了多种协议支持,如OpenWire、STOMP、AMQP和MQTT等,这使得它能与各种语言和平台无缝集成。在服务器环境中,ActiveMQ能够帮助处理高并发场景下的消息传递,...

    消息中间件和JMS消息服务.pdf

    综上所述,消息中间件及其JMS规范为分布式系统提供了强大的通信机制。通过异步、解耦和一对多的特性,消息中间件极大地增强了系统的灵活性、扩展性和可靠性。对于开发人员来说,理解和掌握JMS接口和消息模型是构建...

    消息中间件概览,JMS规范,AMQP协议

    JMS规范详情 AMQP协议详情 RocketMQ RabbitMQ Kafka ActiveMQ ......对比

    activemq中间件视频 jms规范

    通过这个视频系列,你不仅能够掌握ActiveMQ的基本操作,还能深入理解JMS规范,从而在实际项目中更有效地利用消息中间件进行解耦和异步通信。此外,了解ActiveMQ的性能调优和故障排查技巧也是提升系统稳定性的关键,...

    基于JMS的消息中间件的实现-论文

    关键词的选取充分反映了论文的主题内容:消息中间件、JMS规范、JMX分布式管理框架。本文不仅展示了JMS与JMX结合的创新应用,而且为消息中间件的开发提供了理论支持和实践指导。对于理解和实现高效的消息服务系统具有...

    消息中间件和JMS原理

    【消息中间件和JMS原理】是分布式系统中重要的组件,它们主要解决了传统RPC(Remote Procedure Call)中间件...通过选择合适的消息中间件产品并遵循JMS规范,开发者可以构建出高可用、高性能和高度可扩展的分布式系统。

    java_JMS_消息中间件_规范教程

    JMS规范的核心概念包括消息生产者(Message Producers)、消息消费者(Message Consumers)和消息代理(Message Brokers)。消息生产者负责创建并发送消息,而消息消费者则负责接收和处理这些消息。消息代理是中间的...

    activeMQ-jms

    Apache ActiveMQ是业界广泛使用的一款开源消息中间件,它基于Java Message Service (JMS) 规范,提供了高效、可靠的异步通信能力。在“activeMQ-jms”这个压缩包中,我们通常会找到Apache ActiveMQ 5.13.4版本的相关...

    JMS中间件ActiveMQ介绍

    Java Message Service(JMS)是Sun Microsystems提出的一种规范,用于统一不同的消息传递中间件(Message-Oriented Middleware, MOM)系统的接口。JMS为开发者提供了一个与特定平台无关的API,使得开发人员能够更...

    ActiveMq-JMS简单实例

    **ActiveMQ-JMS简单实例** ActiveMQ是Apache软件基金会开发的一款开源消息中间件,它实现了...通过阅读文档,学习JMS规范,以及实践代码示例,你可以深入理解消息中间件的工作原理,并掌握其在企业级应用中的应用。

    消息中间件在分布式系统中的作用介绍

    常见的实现JMS规范的消息中间件产品包括ActiveMQ、RocketMQ、RabbitMQ、HornetQ等。每种消息中间件各有其特点和适用场景,开发者可以根据项目的具体需求和特性来选择合适的中间件产品。 消息中间件在分布式系统中的...

    JMS+activeMQ消息中间件

    **ActiveMQ**是Apache软件基金会开发的一款开源消息代理,它实现了JMS规范,为应用程序提供了一个中间件,允许应用程序之间进行异步的消息通信。ActiveMQ支持多种协议,如OpenWire、AMQP、STOMP、MQTT等,适用于多种...

    消息中间件与JMS.pdf

    Java消息服务(JMS)是由Sun Microsystems提出的一种消息中间件接口规范,旨在统一不同消息中间件系统的接口,提高应用程序的可移植性和互操作性。 #### 四、JMS介绍 **4.1 JMS简介** JMS是Java平台上的消息中间件...

    activeMQ-JMS实例

    ActiveMQ是Apache软件基金会开发的一款开源消息代理,它实现了JMS规范,允许应用程序之间进行异步通信。JMS是一种标准接口,用于Java平台上的消息传递,提供了可靠的消息传递机制,确保消息的顺序和持久性,同时也...

    sun-jms.rar_jms_message queue

    标题中的"sun-jms.rar_jms_message queue"表明我们关注的是Sun Microsystems实现的JMS规范,以及Message Queue这一特定的消息中间件。Sun Microsystems的Message Queue是早期Oracle公司提供的一个JMS兼容的消息代理...

    消息中间件和jms消息服务

    消息中间件和JMS消息服务在IT行业中扮演着至关重要的角色,尤其是在构建大型分布式系统时。传统的RPC中间件如CORBA、DCOM和RMI虽然广泛应用,但它们在处理复杂性和同步通信方面存在局限。为了解决这些问题,面向消息...

    TongLINK/Q消息中间件(说明)

    ### TongLINK/Q 消息中间件相关知识点 #### 一、产品概述 TongLINK/Q 是由北京东方通科技发展有限责任公司开发的一款消息中间件,主要用于构建高效、可靠的分布式应用程序。该中间件提供了多种消息传递模式,包括点...

Global site tag (gtag.js) - Google Analytics