`
m635674608
  • 浏览: 5027258 次
  • 性别: Icon_minigender_1
  • 来自: 南京
社区版块
存档分类
最新评论

架构设计:系统间通信(20)——MQ:消息协议(下)

 
阅读更多

篇文章中我们重点讨论了“协议”的重要性,并为各位读者介绍了Stomp协议和XMPP协议。这两种协议是消息队列中两种不同使用场景下的典型代表。本文主要接续上文的篇幅,继续讨论消息队列中另一种典型协议:AMQP协议。

3-3、AMQP协议

AMQP协议的全称是:Advanced Message Queuing Protocol(高级消息队列协议)。目前AMQP协议的版本为 Version 1.0,这个协议标准在2014年通过了国际标准组织 (ISO) 和国际电工委员会 (IEC) 的投票,成为了新的 ISO 和 IEC 国际化标准。目前支持AMQP的软件厂商包括:

这里写图片描述

3-3-1、协议概览

在网络上讲解AQMP协议的文章已经有很多了,您可以在百度、Google、必应上搜索关键字‘AMQP’,就会出现很多相关文章。虽然文章数量比较多,但是却鲜有质量过硬的文章(当然除了AMQP官网 http://www.amqp.org/ 的协议说明文档)。本小节的内容我试图在能力所及的范围内,为各位读者将AMQP协议的核心要点讲清楚。为了达到这个目的,首先将AMQP协议的原理用下图进行一个全面呈现,然后在详细讲解图中的每一个要点:

这里写图片描述

从上图我们可以看到AMQP协议的各个组成部分:

  • AMQP协议中的元素包括:Message(消息体)、Producer(消息生产者)、Consumer(消息消费者)、Virtual Host(虚拟节点)、Exchange(交换机)、Queue(队列)等

  • 由Producer(消息生产者)和Consumer(消息消费者)构成了AMQP的客户端,他们是发送消息和接收消息的主体。AMQP服务端称为Broker,一个Broker中一定包含完整的Virtual Host(虚拟主机)、 Exchange(交换机)、Queue(队列)定义。

  • 一个Broker可以创建多个Virtual Host(虚拟主机),我们将讨论的Exchange和Queue都是虚拟机中的工作元素(还有User元素)。注意,如果AMQP是由多个Broker构成的集群提供服务,那么一个Virtual Host也可以由多个Broker共同构成。

  • Connection是由Producer(消息生产者)和Consumer(消息消费者)创建的连接,连接到Broker物理节点上。但是有了Connection后客户端还不能和服务器通信,在Connection之上客户端会创建Channel,连接到Virtual Host或者Queue上,这样客户端才能向Exchange发送消息或者从Queue接受消息。一个Connection上允许存在多个Channel,只有Channel中能够发送/接受消息

  • Exchange元素是AMQP协议中的交换机,Exchange可以绑定多个Queue也可以同时绑定其他Exchange。消息通过Exchange时,会按照Exchange中设置的Routing(路由)规则,将消息发送到符合的Queue或者Exchange中

那么AMQP消息在这个结构中是如何通过Producer发出,又经过Broker最后到达Consumer的呢?请看下图:

这里写图片描述

  1. 在Producer(消息生产者)客户端建立了Channel后,就建立了到Broker上Virtual Host的连接。接下来Producer就可以向这个Virtual Host中的Exchange发送消息了。

  2. Exchange(交换机)能够处理消息的前提是:它至少已经和某个Queue或者另外的Exchange形成了绑定关系,并设置好了到这些Queue和Excahnge的Routing(路由规则)。Excahnge中的Routing有三种模式,我们随后会讲到。在Exchange收到消息后,会根据设置的Routing(路由规则),将消息发送到符合要求的Queue或者Exchange中(路由规则还会和Message中的Routing Key属性配合使用)。

  3. Queue收到消息后,可能会进行如下的处理:如果当前没有Consumer的Channel连接到这个Queue,那么Queue将会把这条消息进行存储直到有Channel被创建(AMQP协议的不同实现产品中,存储方式又不尽相同);如果已经有Channel连接到这个Queue,那么消息将会按顺序被发送给这个Channel。

  4. Consumer收到消息后,就可以进行消息的处理了。但是整个消息传递的过程还没有完成:视设置情况,Consumer在完成某一条消息的处理后,将需要手动的发送一条ACK消息给对应的Queue(当然您可以设置为自动发送,或者无需发送)。Queue在收到这条ACK信息后,才会认为这条消息处理成功,并将这条消息从Queue中移除;如果在对应的Channel断开后,Queue都没有这条消息的ACK信息,这条消息将会重新被发送给另外的Channel。当然,您还可以发送NACK信息,这样这条消息将会立即归队,并发送给另外的Channel

3-3-2、Message(消息体)

通过上一小节的描述,我们可以看到AMQP协议中消息的处理规则和Stomp协议中消息的处理规则有类似之处,比如对ACK、NACK的使用。但明显不同的地方还是很多,例如AMQP中Exchange元素提供的Routing路由规则,这显然比Stomp协议中直接发送给Queue要灵活得多。

为了支持AMQP协议中的这些规则,AMQP协议中的消息也必须有特定的格式,实际上AMQP协议要比Stomp协议复杂得多。下面我们就根据ISO/IEC发布的AMQP Version 1.0标准文档,来讨论一下AMQP协议中的消息格式。

首先要说明的是目前国内多个技术站点,详细介绍AMQP消息格式的文章本来就不多(不包括那些聊聊几笔的转发),而且基本上都没有详细讲解格式本身,只是粗略说明了AMQP消息采用二进制格式(任何应用层协议在网络上进行传输,都是使用二进制流进行的,所以这个说法当然没错)。

有的文章还向读者传递了错误的信息。例如说AMQP消息格式包括两部分:消息头和消息正文。 这是完全错误的,虽然AMQP消息格式确实包括Header和Body部分,但是绝对不止这两个部分。(如果真是这样,ISO/IEC组织就不需要使用125页的文档篇幅来进行说明了)

首先我们需要说明的是,作为一种网络通讯协议,AMQP工作在七层/五层网络模型的应用层,是一个典型的应用层协议;另外,由于AMQP协议存在多种元素定义,且这些元素定义工作在不同的领域。例如Channel的定义是为了基于网络连接记录会话状态;Queue等元素帮助AMQP完成路由规则,这些元素在Message消息记录中都需要有所体现。

所以AMQP协议首先要记录网络状态和会话状态,格式如下(AMQP帧的定义在《OASIS Advanced Message Queueing Protocol 
(AMQP) Version 1.0》文档的第38页):

这里写图片描述

其中非PAYLOAD部分,在网络协议的应用层说明Channel的工作状态(当然还有说明整个AMQP消息的长度区域:SIZE),我们真正需要的内容存在PAYLOAD区域。PAYLOAD区域(译文称为‘交付区’)的格式如下(可以在《OASIS Advanced Message Queueing Protocol 
(AMQP) Version 1.0》文档的第3部分:messaging第82页找到详细说明):

这里写图片描述

在PAYLAOD区域一共包含7个数据区域:header、delivery-annotations、message-annotations、properties、application-properties、application-data、footer。这些元素的作用如下:

  • header:header部分记录了AMQP消息的在‘支持AMQP的中间件’中的交互状态。例如该条消息在节点间被交互的总次数、优先级、TTL(Time To Live)值等信息。

  • delivery-annotations:在header部分只能传递规范的、标准的、经过ISO/IEC组织定义的属性。那么如果需要在header部分传递一些非标准信息怎么办呢?这就是delivery-annotations数据区域存在的意义:用来记录那些’非标’的header信息。

  • message-annotations:这个数据区域,用于存储一些自定义的辅助属性。和delivery-annotations区域的非标准信息不同,这里的自定义属性主要用于消息的转换。例如AMQP-JMS信息转换过程中将依据这个数据区域的“x-opt-jms-type”、“x-opt-to-type”、“x-opt-reply-type”和“name”属性进行JMS规范中对应的“JMSType”、“Type of the JMSDestination”、“Type of the JMSReplyTo”和“JMS_AMQP_MA_name”属相的转换。

  • properties:从整个AMQP消息的properties属性开始,到AMQP消息的application-data部分结束,才是AMQP消息的正文内容(译文称为‘裸消息’)。Properties属性记录了AMQP消息正文的那些‘不可变’属性。在properties部分只能传递规范的、标准的、经过ISO/IEC组织定义的属性。例如:消息id、分组id、发送者id、内容编码等。以下是AMQP协议文档中对Properties部分属性的描述(只能包含这些信息):

<type name="properties" class="composite" source="list" provides="section">
    <descriptor name="amqp:properties:list" code="0x00000000:0x00000073"/>
    <field name="message-id" type="*" requires="message-id"/>
    <field name="user-id" type="binary"/>
    <field name="to" type="*" requires="address"/>
    <field name="subject" type="string"/>
    <field name="reply-to" type="*" requires="address"/>
    <field name="correlation-id" type="*" requires="message-id"/>
    <field name="content-type" type="symbol"/>
    <field name="content-encoding" type="symbol"/>
    <field name="absolute-expiry-time" type="timestamp"/>
    <field name="creation-time" type="timestamp"/>
    <field name="group-id" type="string"/>
    <field name="group-sequence" type="sequence-no"/>
    <field name="reply-to-group-id" type="string"/>
</type>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • application-properties:‘应用数据’属性,在这部分数据中主要记录和应用有关的数据,AMQP的实现产品(例如RabbitMQ)需要用这部分数据决定其处理逻辑。例如:送入哪一个Exchange、消息的Routing值是什么、是否进行持久化等。

  • application-data:使用二进制格式描述的AMQP消息的用户部分内容。既是我们发送出去的真实内容

  • footer:一般在这个数据区域存储辅助内容,例如消息的哈希值,HMAC,签名或者加密细节。

以上才是一个AMQP消息的完整结构。当然由于篇幅限制,在某一个数据区域的‘标准’属性就没有再细讲了,例如Properties数据区域定义的creation-time属性、Header数据区域定义的durable属性。有兴趣的朋友可以参考《OASIS Advanced Message Queueing Protocol 
(AMQP) Version 1.0》,这个文档我已经上传到我的下载列表中,供大家免费下载^_^(http://download.csdn.net/detail/yinwenjie/9460653)。

3-3-3、Exchange(交换机)路由规则

Exchange交换机在AMQP协议中主要负责按照一定的规则,将收到的消息转发到已经和它事先绑定好的Queue或者另外的Exchange中。Excahnge交换机的这个处理过程称之为Routing(路由)。目前流行的AMQP路由实现一共有三种:分别是Direct Exchange、Fanout Exchange和Topic Exchange。需要特别注意的是:Exhange需要具备怎样的‘路由’规则,并没有在AMQP标准协议进行强行规定,目前流行的AMQP转发规则都是AMQP实现产品自行开发的(这也是为什么AMQP消息中和路由、过滤规则相关的属性是存放在application-properties区域的原因)。

A、Direct路由

direct模式从字面上的理解应该是‘引导’、‘直接’的含义。direct模式下Exchange将使用AMQP消息中所携带的Routing-Key和Queue中的Routing Key进行比较。如果两者完全匹配,就会将这条消息发送到这个Queue中。如下图所示:

这里写图片描述

B、Fanout路由

Fanout路由模式不需要Routing Key。当设置为Fanout模式的Exchange收到AMQP消息后,将会将这个AMQP消息复制多份,分别发送到和自己绑定的各个Queue中。如下图所示:

这里写图片描述

C、Topic路由

Topic模式是Routing Key的匹配模式。Exchange将支持使用‘#’和‘ * ’通配符进行Routing Key的匹配查找(‘#’表示0个或若干个关键词,‘ * ’表示一个关键词,注意是关键词不是字母)。如下图所示:

这里写图片描述

为了方便各位读者的理解,这里我们再举几个通配符匹配的示例:

  • “param.#”,可以匹配“param”、“param.test”、“param.value”、“param.test.child”等等AMQP消息的Routing Key;但是不能匹配诸如“param1.test”、“param2.test”、“param3.test”。以为param这个关键词和param1这个关键词不相同。

  • “param.*.* ”,可以匹配“param.test.test”、“param.test.value”、“param.test.child”等等AMQP消息的Routing Key;但是不能匹配诸如“param”、“param.test”、“parm.child”等等Routing Key。

  • “param.*.value”,可以匹配“param.value.value”、“param.test.value”等Routing Key;但是不能匹配诸如“param.value”、“param.value.child”等Routing Key。

注意,以上介绍的Direct 路由模式和Topic 路由模式中,如果Exchange交换机没有找到任何匹配Routing Key的Queue,那么这条AMQP消息会被丢弃。(只有Queue有保存消息的功能,但是Exchange并不负责保存消息)

4、不得不提的JMS规范

JMS不是消息队列,更不是某种消息队列协议。**JMS是Java消息服务接口,是一套规范的JAVA API 接口。这套规范接口由SUN提出,并在2002年发布JMS规范的Version 1.1版本。**JMS和消息中间件厂商无关,既然是一套接口规范,就代表这它需要各个厂商进行实现。好消息是,大部分消息中间件产品都支持JMS 接口规范。也就是说,您可以使用JMS API来连接Stomp协议的产品(例如ActiveMQ)。就像您可以使用JDBC API来连接ORACLE或者MYSQL一样。

部分网络上的资料都介绍JMS是一个消息队列,这个说法是错误的,会误导读者。难道你能说JDBC是数据库?

当然,这些具体实现JMS规范的JAVA API都是由具体的中间件厂商提供的。下面一段代码演示了如何使用JMS建立与ActiveMQ的连接:

package com.yinwenjie.test.testActivemq.jms;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;

/**
 * 测试使用JMS API连接ActiveMQ
 * @author yinwenjie
 */
public class JMSProducer {
    /**
     * 由于是测试代码,这里忽略了异常处理。
     * 正是代码可不能这样做
     * @param args
     * @throws RuntimeException
     */
    public static void main (String[] args) throws Exception {
        // 定义JMS-ActiveMQ连接信息
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616", "username", "password");
        Session session = null;
        Destination sendQueue;
        Connection connection = null;

        //进行连接
        connection = connectionFactory.createConnection();
        connection.start();

        //建立会话
        session = connection.createSession(true, Session.SESSION_TRANSACTED);
        //建立queue(当然如果有了就不会重复建立)
        sendQueue = session.createQueue("");
        //建立消息发送者对象
        MessageProducer sender = session.createProducer(sendQueue);
        TextMessage outMessage = session.createTextMessage();
        outMessage.setText("这是发送的消息内容");

        //发送(JMS是支持事务的)
        sender.send(outMessage);
        session.commit();

        //关闭
        sender.close();
        connection.close();
        connectionFactory.close();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51

这里,再给各位读者一个官方文档。这个官方文档用于描述ActiveMQ消息中间件中实现的AMQP协议信息转换为JMS服务接口能够识别的数据信息(请仔细理解这句话黑体字部分的描述)。http://activemq.apache.org/amqp.html

5、后文介绍

通过两篇文章的篇幅,我们介绍了典型的消息队列协议。当然还有很多具体的消息队列协议没有讲到,但是通过介绍XMPP、AMQP、Stomp协议可以起到一个管中窥豹可见一斑的效果。另外我们还说明了JMS规范的具体含义,以便帮助读者纠正一些不正确的观点。

下一篇文章开始,我们将讲解两个典型的消息队列中间件:ActiveMQ和RabbitMQ。最后我们还会列举一个实际场景,然后通过消息队列技术一起搭建一个高性能的业务处理方案。

 

http://m.blog.csdn.net/yinwenjie/article/details/50820369

分享到:
评论

相关推荐

    MQ消息序号Message Sequence详解

    在探讨MQ(Message Queue)中的消息序号Message Sequence时,我们首先要理解MQ系统的基本架构及其如何确保消息的有序性和完整性。MQ是一种用于分布式系统间进行消息传递的中间件,它能够实现消息的异步传输、持久...

    新一代DevOps——腾讯企业级消息中间件DevOps实践 共25页.pdf

    - **业务解耦**:实现不同服务间的异步通信,避免系统间的紧密耦合。 - **削峰限流**:缓解高峰时期的压力,防止系统过载。 - **广播**:支持消息的广泛分发,满足一对多的消息传输需求。 - **延时消费**:支持...

    保险架构.pdf————电子版_pdf版

    系统设计应保证这些供应商的接入不会影响现有业务流程,且能够兼容不同供应商的特定需求。 2. OPEN-API平台:这是一个提供集成自由产品、多供应商产品和灵活定价机制的架构方案。通过这种开放式的API(应用程序编程...

    基于Go实现的分布式MQ消息队列

    - **MQ异步通信**:采用消息队列的方式可以有效降低服务间的耦合度。各服务只需要关注与消息队列的交互,而不需要关心其他服务的状态。这种方式不仅提高了系统的响应速度,还增强了系统的容错能力。 #### 三、常见...

    C#下可用的NetMQ.dll和ProtoBuf.dll

    在本文中,我们将深入探讨C#环境下的两个关键库——NetMQ.dll和ProtoBuf.dll,以及它们在实际开发中的应用。这两个库都是C#开发者在处理消息队列和数据序列化时的重要工具。 首先,NetMQ(原名ZeroMQ)是一个高度可...

    大型分布式网站架构设计与实践.rar

    总的来说,《大型分布式网站架构设计与实践》是一本全面解析分布式系统设计的宝典,它不仅介绍了各种核心技术,还分享了实际项目中的经验教训,对于想要提升分布式架构设计能力的IT从业者来说,具有极高的参考价值。...

    通用软件架构设计文档.rar

    《通用软件架构设计文档》这份压缩包包含了软件开发过程中至关重要的一个环节——软件架构设计。软件架构设计是软件开发的基石,它定义了系统的整体结构、组件及其相互关系,为软件项目的成功实施提供了蓝图。这份...

    TIBCO MQ信息存取

    TIBCO MQ允许应用程序之间通过消息队列进行通信,即使在发送和接收方不同时在线的情况下也能确保信息的正确传递。 首先,我们要理解TIBCO MQ的核心概念——消息队列。消息队列是一种异步通信模型,生产者将消息放入...

    spring+MQ消息队列

    在IT行业中,消息队列(Message Queue,简称MQ)是一种重要的中间件技术,它用于解耦应用程序组件,提高系统的响应速度和可扩展性。Spring框架是Java开发中的一个核心工具,它提供了丰富的功能来简化企业级应用的...

    MQ认证考试000-994.pdf

    IBM 000-994 考试是针对WebSphere MQ V6.0系统管理的专业认证,旨在验证考生对IBM的中间件产品——WebSphere MQ的深入理解和实际操作能力。WebSphere MQ,通常被称为MQSeries,是IBM提供的一款企业级消息中间件,它...

    5本架构师必读电子书.rar

    《RabbitMQ——高效部署分布式消息队列》这本书主要涵盖了分布式系统中消息队列的核心概念和技术,特别是关于RabbitMQ的使用。RabbitMQ是一个开源的消息代理和队列服务器,它使得应用程序之间可以通过异步处理进行...

    IBM_MQ_JAVA程序例子

    总之,IBM MQ与Java的结合提供了强大的消息传递能力,适用于分布式系统、微服务架构和任何需要可靠、高效通信的环境。通过这些示例,开发者可以学习如何在Java应用程序中正确地集成和使用IBM MQ,从而实现健壮的异步...

    Qcon北京2018--《实用且适用的架构设计》--邓光耕

    - **消息同步**:利用消息队列(MQ)实现异步通信,增强系统的解耦合度和稳定性。 ### 总结 最后,邓光耕先生总结了几点重要的建议: - **架构根植于业务**:任何架构设计都应紧密围绕业务需求展开,不能脱离实际...

    再深一点:如何给女朋友解释什么是微服务?(csdn)————程序.pdf

    7. **MQ**:消息队列,实现服务间异步解耦。 8. **数据库主从**:提高系统的处理能力。 微服务架构的特点包括: 1. **多服务构成**:系统由多个独立服务组成。 2. **独立部署**:每个服务可单独部署。 3. **松耦合...

    Websphere MQ入门教程

    在对比三种通信技术——直接连接、远程过程调用(RPC)和消息队列时,Websphere MQ的优势在于其消息导向特性。它能够确保消息的可靠传递,即使发送方和接收方在消息传递过程中短暂离线,消息也不会丢失。此外,消息...

    Java毕业设计——基于java博网即时通讯软件的设计与实现(论文+答辩PPT+源代码+数据库).zip

    本项目是一个基于Java技术的博网即时通讯软件的设计与实现,涵盖了从系统需求分析到功能模块的开发,再到数据库设计和界面实现等多个方面。这个毕业设计不仅提供了完整的论文,还包含了答辩PPT、源代码以及数据库...

    MQ选型之RabbitMQ

    MQ,即消息队列(Message Queue),是一种应用程序间通信方法,通过消息的存储与转发来实现组件间的解耦。队列可以被形象地理解为一种数据结构,它遵循先进先出(FIFO)原则。消息队列作为一种软件架构模式,允许程序...

    MQ服务器端

    RabbitMQ是一个开源的消息代理和队列服务器,它基于AMQP(Advanced Message Queuing Protocol)协议,广泛应用于分布式系统中的异步任务处理、解耦通信以及可靠消息传递。在本篇中,我们将深入探讨RabbitMQ Server的...

    stompest-mq python

    标题中的"stompest-mq python"指的是使用Python语言实现的Stomp协议库,用于消息队列(MQ)通信。Stomp是一种简单易用、跨平台的网络协议,专门用于在分布式环境中传输消息。在Python中,stompest库提供了与Stomp...

Global site tag (gtag.js) - Google Analytics