`
somebody_hjh
  • 浏览: 182074 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

ActiveMQ分享(一)JMS简介

    博客分类:
  • Java
阅读更多
一、概述

  Message,即消息。人与人之间通过消息传递信息。言语、眼神、肢体动作都可被视为消息体。当然还有我们经常用到的邮件、短信。计算机系统也由消息来主导运行。每一条指令的执行,每一个数据包的传递。软件系统间的合作也不例外,消息告诉各个系统应该怎样协作。事件处理机制,也是消息传送的过程。消息无处不在。

  消息分为同步消息和异步消息。同步消息在接收到对方的返回前,需要挂起,直到返回或超时。异步消息只需要发送消息,不需要对方系统的立即反馈。

  同步消息如java RPC调用,同步调用依赖于被调用方,如果被调用方失败或网络错误,那么程序就没办法继续执行下去,造成一个系统最薄弱的环节依赖于对方系统。而多个系统通过同步调用方式耦合在一起的时候,那么可靠性取决于最薄弱的一方系统。而异步调用能增强一个系统的健壮性。当然,不是任何情况都适合异步调用,还是那句话,能异步的地方,尽量异步。

  异步消息,如同一个邮箱系统,我们把信件丢入邮桶,邮递员会更具上面的地址,送达到这封信要去的地方。邮箱和信件格式由邮局提供定义,比如邮箱需要有一个口子投递邮件,邮件需要有地址,邮政编码等等。而这些邮箱具体的加工和制作均交由各自的厂商来完成。
在java消息领域,我们也有一个称为消息中间件的东西,来提供这样一个服务。消息的发送、消费接口、消息体的格式等都由JMS来定义,而具体的实现由各个消息中间件厂商来实现。JMS是sun公司对于消息中间件的一个规范。对java领域里的消息起到举足轻重的作用。以前的消息交互,均各自实现一套格式,如同一个国家的人都用不同的方言跟另外来自不同省份的人交流一样。自从规范了普通话,我们的交流成本降低了。这也正如JMS规范在整个java消息领域的作用。

二、JMS简介

  JMS1.1规范定义了一些概念和一组API,可以使得在我们利用消息系统的过程中,不依赖于各个厂商的具体实现,便能写出消息代码。由于不依赖具体厂商实现,这样的代码有很好的移植性。
JMS1.1定义了的部分概念:
1、JMS客户端:接收或发送消息的java系统
2、JMS消息体:系统间发送的消息体
3、JMS提供商:JMS规范实现厂商
4、JMS管理对象:预先配置好的用于JMS客户端的JMS对象。如ConnectionFactory跟JMS服务端的连接工厂,Destination 接收和发送消息的目标地址等
5、JMS Domain:JMS定义了两种域模型,一种是PTP(point-to-point)即点对点消息传输模型,一种是pub/sub(publish-subscribe)即发布订阅模型。
  PTP通过一个先进先出的queue实现。很多人在这个PTP概念上有所误解,所谓点对点不是指生产者和消费者只有一个。PTP如下图所示:



  我们可以看到,一个或多个生产者发送消息,消息m2先抵达了queue,然后m1也发出了,并一同存在于一个先进先出的queue里面。消费者也存在一个或多个,对queue里的消息进行消费。但消息被随机的一个消费者消费且仅消费一次。

在pub/sub消息模型中,消息被广播给所有订阅者。如下图:




JMS1.1定义了如下接口:


JMS通用接口JMS PTP接口JMS pub/sub 接口描述
ConnectionFactoryQueueConnectionFactoryTopicConnectionFactory创建与JMS服务的连接
ConnectionQueueConnection TopicConnectionJMS服务的连接,可创建Session
DestinationQueueTopic发送接收消息的目标,Queue和Topic均可视为一种Destination
SessionQueueSessionTopicSession Session
MessageProducerQueueSender TopicPublisher 消息发送者
MessageConsumerQueueReceiver、QueueBrowserTopicSubscriber消息接收者/浏览者/订阅者



接口关系如下:





  可以看到一个JMS应用的发送端的标准流程是:创建连接工厂>创建连接>创建session>创建发送者>创建消息体>发送消息到Destination(queue或topic)。

  接收端则为:创建连接工厂>创建连接>创建session>创建接收者>创建消息监听器监听某Destination的消息>获取消息并执行业务逻辑

下面展示了JMS的编码模板:

发送端:

   
public class QueueSender {
        public static void main(String[] args) throws JMSException{
            ConnectionFactory factory=new ActiveMQConnectionFactory("tcp://localhost:61616");//这里为简便,依赖ActiveMQ的JMS实现。现实程序中可以通过jndi查找方式查找factory,这样彻底避免关联特定JMS实现者。并拥有最大的可移植性。
            Connection connection=factory.createConnection();
            Session session=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
            Message message=session.createTextMessage("Hello World!"+new Date().getTime());
            Queue queue=new ActiveMQQueue("queue.somebody");//同上,这里依赖了ActiveMQ的JMS实现,现实中可以采用jndi查找方式获得Queue。
            MessageProducer producer=session.createProducer(queue);
            producer.send(message);
            session.close();
            connection.close();
        }
    }


接收端:
       
public class QueueReceiver {
            public static void main(String[] args) throws JMSException{
                ConnectionFactory factory=new ActiveMQConnectionFactory("tcp://localhost:61616");
                Connection connection=factory.createConnection();
                Session session=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
                Queue queue=new ActiveMQQueue("queue.somebody");
                MessageConsumer receiver=session.createConsumer(queue);//创建两个接收者,同时消费同一个queue的消息。queue里的消息派发且仅派发一次给唯一一个消费者。
                MessageConsumer receiver2=session.createConsumer(queue);
                receiver.setMessageListener(new QueueMessageListener("1"));
                receiver2.setMessageListener(new QueueMessageListener("2"));
                connection.start();
            }
        }


消息侦听者:
   
public class QueueMessageListener implements MessageListener{
    private String num;
    public QueueMessageListener(String num){
        super();
        this.num=num;
    }
    public QueueMessageListener(){
        super();
    }
    public void onMessage(Message message) {
        System.out.println(message.toString()+num);
    }


  我们安装了ApacheMQ之后,启动本地MQ服务,便能测试这几个类的执行方式,观察执行过程。我们可以发现消息仅被消费了一次,JMS提供商必须保证一条消息不会重复被消费,当然更不能丢失。

  在PTP下,我们建立一个queue以后,如果没有消费者连入该queue,那么消息生产者发送的消息将堵塞在queue里面,直到有消费者来消费。如果消费者出现故障,不再连入queue,消息还会保存在queue里面,等待下次消费者的连入,再进行消费。在pub/sub模型下,如果消费者出现故障,那么所有的消息,将不在为之保留,下次连接之时,只能消费此时生产者发送的消息。JMS于是定义了另一种订阅者,叫Durable Subscription(持久订阅者),该订阅者在第一次连上topic后,就注册了该topic的消息,消息会在消费者不可用的情况下为之保留消息,并在其再次连上topic后,重推消息给消费者。

  理解了JMS中的两种消息模型以后,我们来说说JMS中的消息体格式。JMS对消息体定为三个部分 1)head 消息头信息 2)properties 消息属性值 3)body 消息内容。如下图所示:



其含义大致解释如下:

头类型描述设置方
JMSDestination描述该消息发往的目的地在发送方法中设定
JMSDeliveryModeNON_PERSISTENT 非持久化 表示消息发往JMS消息服务器之后,保存在内存中,不做持久化;PERSISTENT 持久化 消息发往JMS消息服务器之后,持久化数据。以保证消息服务器拓机造成的消息丢失发送方法中设定
JMSExpiration 消息过期时间。消费者在发送消息时,可设定消息的time-to-live时间,如producer.setTimeToLive(10000),在消息发送后,该时间保留在消息的JMSExpiration字段中,如果在指定的这段时间内消息未被消费,该消息将会被丢弃。在发送方法中设定
JMSPriority 消息优先级,JMS把消息分为10个等级,0-4为普通优先级,5-9为加快优先级,ActiveMQ中默认的消息优先级为4在发送方法中设定
JMSMessageId消息唯一性ID,必须以“ID:”为前缀在发送方法中设定
JMSTimestamp消息发送时间,表示消息的发送时间点,而非传送时间在发送方法中设定
JMSCorrelationID客户端
JMSReplyTo客户端
JMSType客户端
JMSRedeliveredJMS提供商
属性值含义
properties
消息体含义
body消息体分为1:StreamMessage2:MapMessage 3:TextMessage 4:ObjectMessage 5:BytesMessage


再谈消息可靠性

    在上面,谈及消息体格式定义中,有个字段 JMSDeliveryMode用来表示该消息发送后,JMS提供商应该怎么处理消息。PERSISTENT(持久化)的消息在JMS服务器中持久化。接收端如果采用点对点的queue方式或者Durable Subscription(持久订阅者)方式,那么消息可保证只且只有一次被成功接收。NON_PERSISTENT(非持久化)的消息在JMS服务器关闭或宕机时,消息丢失。根据发送端和接收端采用的方式,列出如下可靠性表格,以作参考。

引用
注意:以下的可靠性不包括JMS服务器由于资源关系,造成的消息不能持久化等因素引起的不可靠(该类不可靠应该是JMS提供商或硬件引起的的资源和处理能力的极限问题,应该由管理人员解决)。也不包括消息由于超时时间造成的销毁丢失。

消息发送端消息接收端可靠性及因素
PERSISTENTqueue receiver/durable subscriber消费一次且仅消费一次。可靠性最好,但是占用服务器资源比较多。
PERSISTENTnon-durable subscriber最多消费一次。这是由于non-durable subscriber决定的,如果消费端宕机或其他问题导致与JMS服务器断开连接,等下次再联上JMS服务器时的一系列消息,不为之保留。
NON_PERSISTENTqueue receiver/durable subscriber最多消费一次。这是由于服务器的宕机会造成消息丢失
NON_PERSISTENTnon-durable subscriber最多消费一次。这是由于服务器的宕机造成消息丢失,也可能是由于non-durable subscriber的性质所决定


消息的通知确认
在客户端接收了消息之后,JMS服务怎样有效确认消息是否已经被客户端接收呢?Session session=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);这段代码创建一个非事务性的session,并采用auto_acknowledge方式通知JMS服务器。如果采用事务性session时,通知会伴随session的commit/rollback同时发送通知。在我们采用非事务session时,有三种通知方式。
通知方式效果
DUPS_OK_ACKNOWLEDGE session延迟通知。如果JMS服务器宕机,会造成重复消息的情况。程序必须保证处理重复消息而不引起程序逻辑的混乱。
AUTO_ACKNOWLEDGE 当receive或MessageListener方法成功返回后自动通知。
CLIENT_ACKNOWLEDGE客户端调用消息的acknowledge方法通知




JMS就先简要的介绍到这里,详细资料请参看JMS1.1规范。关于ActiveMQ的文章文章请继续关注 ActiveMQ分享(二)




  • 大小: 26.9 KB
  • 大小: 29.2 KB
  • 大小: 26.6 KB
  • 大小: 5 KB
13
3
分享到:
评论
8 楼 yanjingan 2016-10-31  
通俗易懂 理解的很深 好文章!
7 楼 u012049463 2013-11-28  
谢谢,如果一开始就看这篇文章,又可以少走多少弯路。很具体也很通俗易懂,转走.../cy
6 楼 chenzhou123520 2013-07-28  
正准备学ActiveMQ这块,文章写得很详细啊,通俗易懂。感谢楼主分享,赞一个
5 楼 jingjiniao0118 2013-01-25  
很详尽,谢谢分享!!!
4 楼 763691 2013-01-11  
思路很清晰,谢谢分享
3 楼 coolinc 2012-12-03  
这个介绍好
2 楼 DDT_123456 2012-04-08  
感谢这么详尽的分享
1 楼 rockyeah 2010-08-23  
期待后续的分享.

相关推荐

    spring整合jms+activemq

    ActivemQ是Apache软件基金会的一个项目,它实现了JMS规范,提供了一个高效、可靠的中间件服务,用于处理消息队列。本文将深入探讨如何在Spring 3.0中整合JMS与ActivemQ,以及它们在实际应用中的关键知识点。 首先,...

    ActiveMQ -5.9和jms-1.1源码下载

    ActiveMQ是Apache软件基金会开发的一款开源消息中间件,它实现了Java消息服务(JMS)标准,为分布式系统提供高效、可靠的消息传递。ActiveMQ-5.9是该软件的一个版本,而JMS-1.1是Java消息服务的规范版本,用于定义...

    activemq jms

    标题中的"ActiveMQ JMS"指的是Apache ActiveMQ,它是一个开源的消息中间件,实现了Java消息服务(Java Message Service,简称JMS)。ActiveMQ是Apache软件基金会的一个项目,它提供了多种协议的支持,包括JMS、AMQP...

    activemq activeMq笔记

    Apache ActiveMQ 是一款非常流行的开源消息中间件,它支持 Java 消息服务 (JMS) 标准,并提供了多种高级功能,例如持久化、集群、故障转移等。ActiveMQ 能够帮助开发者实现解耦、可靠的消息传输以及高性能的应用程序...

    分享一些ActiveMQ的资料

    ActiveMQ是中国最流行的开源消息中间件之一,它基于开放标准的Java消息服务(JMS)规范,用于在分布式系统中高效地传输数据。这个压缩包包含了一系列关于ActiveMQ的学习资源,包括API参考手册、笔记、整合教程以及...

    jms Spring+ActiveMQ 5.4.2

    描述中的链接指向了一篇博客文章,虽然具体内容未给出,但可以推测博主分享了关于如何在Spring中配置和使用ActiveMQ 5.4.2的教程或经验。通常,这样的内容会涉及以下几个关键知识点: 1. **Spring JMS模块**:...

    JMS之ActiveMQ工具类分享(关于同步回执和异步回执)(新)

    ActiveMQ工具类分享(关于同步回执和异步回执),本穷屌CSDN没币下载资源了,莫怪莫怪。后面会调整回来。。。 资源描述具体看文章啦。http://blog.csdn.net/kunloz520/article/details/78830656

    ActiveMQ与spring集成实例之使用Maven构建

    1. **Apache ActiveMQ**:ActiveMQ是Apache软件基金会的一个开源项目,它是一个基于Java消息服务(JMS)规范的消息中间件。ActiveMQ允许应用程序之间通过消息传递进行异步通信,提高系统的可扩展性和解耦性。 2. **...

    ActiveMQwindow环境下安装包

    Apache ActiveMQ是一款开源的消息中间件,它遵循Java Message Service (JMS) 规范,用于在分布式系统中高效地处理消息传递。在Windows环境下安装ActiveMQ,我们可以按照以下步骤进行: 1. **下载与解压**: 首先,...

    activemq 5.15.15源码包,源码包

    源码包的分享对于开发者来说是一个宝贵的学习和研究资源,可以深入理解其内部机制和工作原理。 ActiveMQ的核心功能包括: 1. **多协议支持**:ActiveMQ 支持多种通信协议,如OpenWire、STOMP、AMQP、MQTT和WS-MQ,...

    ActiveMQ问题解决记录

    在IT行业中,Apache ActiveMQ是一个广泛使用的开源消息代理和队列服务器,它是Java Message Service (JMS) 的实现,允许应用程序之间进行异步通信。这篇博客"ActiveMQ问题解决记录"可能涵盖了作者在使用ActiveMQ过程...

    ActiveMQ 中文版参考手册

    ActiveMQ 是一个开源的 JMS 实现,以其高性能和可靠性而受到广泛欢迎。本指南涵盖了 JMS 的基本概念,包括点对点(PTP)和发布/订阅(Pub/Sub)消息传递模式,以及 JMS 连接工厂、连接、会话、目的地、消息生产者和...

    activemq-5.16.2-源码包,源码包资源

    "资源达人分享计划"的标签表明这是一个社区共享的资源,意味着你可以在社区中寻求帮助,或者分享你对ActiveMQ的理解和经验。研究和理解ActiveMQ的源码不仅能提升你的消息中间件知识,还能加深对Java编程、分布式系统...

    apache-activemq-5.16.0-bin-windows.zip----windows版用

    分享-windows版最新的apache-activemq-5.16.0 。 使用activeMQ来完成jms的发送,必须要下载activeMQ,然后再本机安装,并且启动activeMQ的服务才行。在官网下载完成之后,运行bin目录下面的activemq.bat,将activeMQ...

    activeMQ 示例

    在描述中提到的“博文链接:https://happyzj.iteye.com/blog/935742”,这可能是一个博主分享的关于ActiveMQ详细使用的文章,虽然具体内容没有提供,但通常这类博客会包含安装步骤、配置教程、示例代码以及常见问题...

    ActiveMQ in action.pdf 英文版

    ActiveMQ是Java消息服务(JMS)的一个实现,广泛用于分布式系统中的异步通信和解耦。这本书英文版的易读性使得即便是对英文不太熟练的读者也能从中受益。 1. **ActiveMQ基础** - **什么是ActiveMQ**:ActiveMQ是...

    apache-activemq-5.15.15二进制包,安装包

    Apache ActiveMQ是世界上最流行的开源消息代理和集成模式服务器,它基于Java Message Service (JMS) 规范。这个“apache-activemq-5.15.15二进制包,安装包”包含了运行和配置ActiveMQ所需的所有组件,方便用户在...

    SpringBoot整合ActiveMQ完整源码分享给需要的同学

    ActiveMQ是Apache软件基金会开发的一款开源消息代理,支持多种消息协议,如OpenWire、AMQP、STOMP等。Spring框架则为Java应用提供了全面的基础设施支持,而Spring Boot则简化了Spring应用的初始搭建以及开发过程。本...

    activemq中间件视频教程

    - **性能优化**:分享提高ActiveMQ性能的技巧,如缓存策略、消息大小控制等。 - **实际应用场景**:通过具体案例展示ActiveMQ在微服务、大数据、物联网等领域的应用。 本“ActiveMQ中间件视频教程”旨在为初学者...

Global site tag (gtag.js) - Google Analytics