阅读更多

3顶
0踩

企业架构

原创新闻 RabbitMQ在分布式系统的应用

2016-05-16 16:31 by 副主编 mengyidan1988 评论(3) 有9191人浏览
引用

声明:本文CSDN作者原创投稿文章,未经许可禁止任何形式的转载。
作者:吕舜,MaxLeap 团队_Service&Infra成员。
责编:钱曙光,关注架构领域,寻求报道或者投稿请发邮件qianshg@csdn.net,另有「CSDN 高级架构师群」,内有诸多知名互联网公司的大牛架构师,欢迎架构师加微信qshuguang2008申请入群,备注姓名+公司+职位。

由于之前做的项目中需要在多个节点之间可靠地通信,所以废弃了之前使用的Redis pub/sub(因为集群有单点问题,且有诸多限制),改用了RabbitMQ。使用期间得到不少收获,也踩了不少坑,所以在此分享下心得。

怎么保证可靠性的?
RabbitMQ提供了几种特性,牺牲了一点性能代价,提供了可靠性的保证。

持久化
当RabbitMQ退出时,默认会将消息和队列都清除,所以需要在第一次声明队列和发送消息时指定其持久化属性为true,这样RabbitMQ会将队列、消息和状态存到RabbitMQ本地的数据库,重启后会恢复。
durable=true
channel.queueDeclare("task_queue", durable, false, false, null);//队列 
channel.basicPublish("", "task_queue",
    MessageProperties.PERSISTENT_TEXT_PLAIN,
    message.getBytes());//消息

注:当声明的队列已经存在时,尝试重新定义它的durable是不生效的。

接收应答
客户端接收消息的模式默认是自动应答,但是通过设置autoAck为false可以让客户端主动应答消息。当客户端拒绝此消息或者未应答便断开连接时,就会使得此消息重新入队(在版本2.7.0以前是到重新加入到队尾,2.7.0及以后是保留消息在队列中的原来位置)。
autoAck = false;
requeue = true;
channel.basicConsume(queue, autoAck, callback);
channel.basicAck();//应答
channel.basicReject(deliveryTag, requeue);//拒绝
channel.basicRecover(requeue);//恢复

发送确认
默认情况下,发送端不关注发出去的消息是否被消费掉了。可设置channel为confirm模式,所有发送的消息都会被确认一次,用户可以自行根据server发回的确认消息查看状态。详细介绍见:confirms
channel.confirmSelect(); // 进入confirm模式
do publish messages... // 每个消息都会被编号,从1开始
channel.getNextPublishSeqNo() // 查看下一个要发送的消息的序号
channel.waitForConfirms(); // 等待所有消息发送并确认

事务:和confirm模式不能同时使用,而且会带来大量的多余开销,导致吞吐量下降很多,故而不推荐。
channel.txSelect();
try {
    do something...
    channel.txCommit();
} catch (e){
    channel.txRollback();
}

消息队列的高可用(主备模式)
相比于路由和绑定,可以视为是共享于所有的节点的,消息队列默认只存在于第一次声明它的节点上,这样一旦这个节点挂了,这个队列中未处理的消息就没有了。 幸好,RabbitMQ提供了将它备份到其他节点的机制,任何时候都有一个master负责处理请求,其他slaves负责备份,当master挂掉,会将最早创建的那个slave提升为master。

命令:rabbitmqctl set_policy ha-all “^ha\.” ‘{“ha-mode”:”all”}’:设置所有以’ha’开头的queue在所有节点上拥有备份。详细语法点这里;也可以在界面上配置。



注:由于exclusive类型的队列会在client和server连接断开时被删掉,所以对它设置持久化属性和备份都是没有意义的。

顺序保证
直接上图好了:



一些需要注意的地方
集群配置:
一个集群中多个节点共享一份.erlang.cookie文件;若是没有启用RABBITMQ_USE_LONGNAME,需要在每个节点的hosts文件中指定其他节点的地址,不然会找不到其他集群中的节点。

脑裂:
RabbitMQ集群对于网络分区的处理和忍受能力不太好,推荐使用federation或者shovel插件去解决。federation详见高级->Federation。但是,情况已经发生了,怎么去解决呢?放心,还是有办法恢复的。当网络断断续续时,会使得节点之间的通信断掉,进而造成集群被分隔开的情况。这样,每个小集群之后便只处理各自本地的连接和消息,从而导致数据不同步。当重新恢复网络连接时,它们彼此都认为是对方挂了-_-||,便可以判断出有网络分区出现了。但是RabbitMQ默认是忽略掉不处理的,造成两个节点继续各自为政(路由,绑定关系,队列等可以独立地创建删除,甚至主备队列也会每一方拥有自己的master)。可以更改配置使得连接恢复时,会根据配置自动恢复。

ignore:默认,不做任何处理

pause-minority:断开连接时,判断当前节点是否属于少数派(节点数少于或者等于一半),如果是,则暂停直到恢复连接。

{pause_if_all_down, [nodes], ignore | autoheal}:断开连接时,判断当前集群中节点是否有节点在nodes中,如果有,则继续运行,否则暂停直到恢复连接。这种策略下,当恢复连接时,可能会有多个分区存活,所以,最后一个参数决定它们怎么合并。

autoheal:当恢复连接时,选择客户端连接数最多的节点状态为主,重启其他节点。

配置:**【详见下文:集群配置】

多次ack:客户端多次应答同一条消息,会使得该客户端收不到后续消息。

结合Docker使用
集群版本的实现:详见我自己写的一个例子rabbitmq-server-cluster

消息队列中间件的比较
RabbitMQ:
优点:支持很多协议如:AMQP,XMPP,STMP,STOMP;灵活的路由;成熟稳定的集群方案;负载均衡;数据持久化等。
缺点:速度较慢;比较重量级,安装需要依赖Erlang环境。
Redis:
优点:比较轻量级,易上手
缺点:单点问题,功能单一
Kafka:
优点:高吞吐;分布式;快速持久化;负载均衡;轻量级
缺点:极端情况下会丢消息
最后附一张网上截取的测试结果:



更多性能参数见:RabbitMQ Performance Measurements

如果有兴趣简单了解下RabbitMQ,可以继续往下看~

几个重要的概念
  • Virtual Host:包含若干个Exchange和Queue,表示一个节点;
  • Exchange:接受客户端发送的消息,并根据Binding将消息路由给服务器中的队列,Exchange分为direct、fanout、topic三种;
  • Binding:连接Exchange和Queue,包含路由规则;
  • Queue:消息队列,存储还未被消费的消息;
  • Message:Header+Body;
  • Channel:通道,执行AMQP的命令;一个连接可创建多个通道以节省资源。

Client
RabbitMQ官方实现了很多热门语言的客户端,就不一一列举啦,以java为例,直接开始正题:

建立连接:
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");

可以加上断开重试机制:
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(10000);

创建连接和通道:
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

一对一:一个生产者,一个消费者



代码同上,只不过会有多个消费者,消息会轮序发给各个消费者。

如果设置了autoAck=false,那么可以实现公平分发(即对于某个特定的消费者,每次最多只发送指定条数的消息,直到其中一条消息应答后,再发送下一条)。需要在消费者中加上:
int prefetchCount = 1;
channel.basicQos(prefetchCount);

其他同上。

广播



生产者:
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);

消费者同上。

RPC



其实就是一对一模式的一种用法:

首先,客户端发送一条消息到服务端声明的队列,消息属性中包含reply_to和correlation_id
引用

- reply_to 是客户端创建的消息的队列,用来接收远程调用结果
- correlation_id 是消息的标识,服务端回应的消息属性中会带上以便知道是哪条消息的结果。

然后,服务端接收到消息,处理,并返回一条结果到reply_to队列中,最终,客户端接收到返回消息,继续向下处理。

Server
支持各大主流操作系统,这里以Unix为例介绍下常用配置和命令:

安装
由于RabbitMQ是依赖于Erlang的,所以得首先安装最近版本的Erlang。
单点的安装比较简单,下载解压即可。

下载地址:http://www.rabbitmq.com/download.html
  • 配置:(一般的,用默认的即可。)
  • $RABBITMQ_HOME/etc/rabbitmq/rabbitmq-env.conf: 环境变量默认配置(也可在启动脚本中设置,且以启动命令中的配置为准)。常用的有:
  • RABBITMQ_NODENAME:节点名称,默认是rabbit@$HOSTNAME。
  • RABBITMQ_NODE_PORT:协议端口号,默认5672。
  • RABBITMQ_SERVER_START_ARGS:覆盖rabbitmq.config中的一些配置。
  • $RABBITMQ_HOME/etc/rabbitmq/rabbitmq.config: 核心组件,插件,erlang服务等配置,常用的有:
  • disk_free_limit:队列持久化等信息都是存到RabbitMQ本地的数据库中的,默认限制50000000(也就是最多只让它使用50M空间啦,不够可以上调,也支持空闲空间百分比的配置)。要是超标了,它就罢 工了……
  • vm_memory_high_watermark:内存使用,默认0.4(最多让它使用40%的内存,超标罢 工)

(注:若启动失败了,可以在启动日志中查看到具体的错误信息。)
命令:
引用

rabbitmqctl stop_app
rabbitmqctl join_cluster [--ram] nodename@hostname:将当前节点加入到集群中;默认是以disc节点加入集群,加上--ram为ram节点。
rabbitmqctl start_app
rabbitmqctl cluster_status:查看集群状态

集群
集群节点共享所有的状态和数据,如:用户、路由、绑定等信息(队列有点特殊,虽然从所有节点都可达,但是只存在于第一次声明它的那个节点上,解决方案——详见上文:消息队列的高可用);每个节点都可以接收连接,处理数据。

集群节点有两种,disc:默认,信息存在本地数据库;ram:加入集群时,添加–ram参数,信息存在内存,可提高性能。

配置:(一般的,用默认的即可。)
$RABBITMQ_HOME/etc/rabbitmq/rabbitmq-env.conf:
RABBITMQ_USE_LONGNAME:默认false,(默Rene的,RABBITMQ_NODENAME中@后面的\$HOSTNAME是主机名,所以需要集群中每个节点的hosts文件包含其他节点主机名到地址的映射。但是如果设置为true,就可以定义RABBITMQ_NODENAME中的$HOSTNAME为域名了)
RABBITMQ_DIST_PORT:集群端口号,默认RABBITMQ_NODE_PORT + 20000
$RABBITMQ_HOME/etc/rabbitmq/rabbitmq.config:
cluster_nodes:设置后,在启动时会尝试自动连接加入的节点并组成集群。
cluster_partition_handling:【详见上文:网络分区的处理】
更多详细的配置见:配置

命令:
rabbitmqctl stop_app
rabbitmqctl join_cluster [--ram] nodename@hostname:将当前节点加入到集群中;默认是以disc节点加入集群,加上--ram为ram节点。
rabbitmqctl start_app
rabbitmqctl cluster_status:查看集群状态

(注:如果加入集群失败,可先查看)
  • 每个节点的$HOME/.erlang.cookie内容一致;
  • 如果hostname是主机名,那么此hostname和地址的映射需要加入hosts文件中;
  • 如果使用的是域名,那么需要设置RABBITMQ_USE_LONGNAME为true。

(注:docker版集群的见:rabbitmq-server-cluster)
高级
AMQP协议简介
RabbitMQ原生支持AMQP 0-9-1并扩展实现了了一些常用的功能:AMQP 0-9-1

包含三层:
  • 模型层: 最高层,提供了客户端调用的命令,如:queue.declare,basic.ack,consume等。
  • 会话层:将命令从客户端传递给服务器,再将服务器的应答传递给客户端,会话层为这个传递过程提供可靠性、同步机制和错误处理。
  • 传输层:主要传输二进制数据流,提供帧的处理、信道复用、错误检测和数据表示。




注:其他协议的支持见:RabbitMQ支持的协议

常用插件
管理界面(神器)

启动后,执行rabbitmq-plugins enable rabbitmq_management→访问http://localhost:15672→查看节点状态,队列信息等等,甚至可以动态配置消息队列的主备策略,如下图:



Federation

启用Federation插件,使得不同集群的节点之间可以传递消息,从而模拟出类似集群的效果。这样可以有几点好处:
  • 松耦合:联合在一起的不同集群可以有各自的用户,权限等信息,无需一致;此外,这些集群的RabbitMQ和Erlang的版本可以不一致。
  • 远程网络连接友好:由于通信是遵循AMQP协议的,故而对断断续续的网络连接容忍度高。
  • 自定义:可以自主选择哪些组件启用federation。

几个概念:
  • Upstreams:定义上游节点信息,如下:

引用

rabbitmqctl set_parameter federation-upstream my-upstream '{"uri":"amqp://server-name","expires":3600000}' 定义一个my-upstream

  • uri是其上游节点的地址,多个upstream的节点无需在同一集群中。
  • expires表示断开连接3600000ms后其上游节点会缓存消息。
  • Upstream sets:多个Upstream的集合;默认有个all,会将所有的Upstream加进去。
  • Policies:定义哪些exchanges,queues关联到哪个Upstream或者Upstream set,如:

rabbitmqctl set_policy --apply-to exchanges federate-me "^amq\." '{"federation-upstream-set":"all"}'

将此节点所有以amq.开头的exchange联合到上游节点的同名exchange。
注:
  • 由于下游节点的exchange可以继续作为其他节点的上游,故可设置成循环,广播等形式。
  • 通过max_hops参数控制传递层数。
  • 模拟集群,可以将多个节点两两互连,并设置max_hops=1。




rabbitmq-plugins enable rabbitmq_federation

如果启用了管理界面,可以添加:
rabbitmq-plugins enable rabbitmq_federation_management

这样就可以在界面配置Upstream和Policy了。

注:如果在一个集群中使用federation,需要该集群每个节点都启用Federation插件;
注:更多插件请见:插件
  • 大小: 304.1 KB
  • 大小: 21.2 KB
  • 大小: 13.2 KB
  • 大小: 11.4 KB
  • 大小: 8.8 KB
  • 大小: 17.5 KB
  • 大小: 7 KB
  • 大小: 242 KB
  • 大小: 81.5 KB
3
0
评论 共 3 条 请登录后发表评论
3 楼 ycong2525 2016-05-25 14:23
     
2 楼 莫欺少年穷Java 2016-05-18 21:16
  
1 楼 netkiller.github.com 2016-05-17 13:56
写的不错。

发表评论

您还没有登录,请您登录后再发表评论

相关推荐

  • RabbitMQ在分布式系统中的应用

    持久化当RabbitMQ退出时,默认会将消息和队列都清除,所以需要在第一次声明队列和发送消息时指定其持久化属性为true,这样RabbitMQ会将队列、消息和状态存到RabbitMQ本地的数据库,重启后会恢复。java: 注:当声明...

  • 通过RabbitMQ实现分布式事务

    通过RabitMQ实现分布式事务题前言业务需求核心原理核心难点解决问题思路代码实现其他总结 前言 这篇文章是通过学习哔哩哔哩中的视频“阿里架构师如何30分钟基于MQ解决...这涉及到两个系统,需要用到分布式事务。 核心原

  • RabbitMQ-分布式原理和实现

    RabbitMQ的分布式 首先我们要了解RabbitMQ的集群架构模式,比如主备、shovels、镜像集群队列、异步多集群 然后从0开始构建一个异步的镜像队列集群,然后整合HAProxy和keepalive,实现高可用、高可靠。 然后我们来...

  • SpringBoot整合消息中间件 RabbitMQ 第 10 篇 —— RabbitMQ 解决分布式事务(完结)

    在多数据源的情况下,两个服务相互通讯的时候,就会出现数据不一致的问题,从而出现分布式事务产生的原因。举例:某电商平台,用户先下单后,扣库存失败,那么将会导致超卖;如果下单不成功,扣库存成功,那么会导致...

  • 分布式消息队列--RabbitMQ

    rabbitmq基本介绍、使用与部署

  • 分布式消息中间件RabbitMQ解析

    RabbitMQ作为分布式消息存储和转发系统,已广泛使用于分布式系统中。本文简要介绍RabbitMQ相关概念、集群架构和消息转发流程,并与Kafka做了简要对比,以加深理解。

  • RabbitMQ高级 -- 分布式事务

    主要是应用在我们 springCloud(微服务)当中 ~ 具体是怎么应用的呢 ,看下图 ! 解释: 如上图所示,现在有订单服务和配送中心两个服务,每个服务都有自己独立的数据库,假如有一个用户在订单服务下了下单,需要往...

  • RabbitMQ-分布式系统中的一把利剑

    你是否在为异构系统的不同进程间相互调用、通讯的问题而苦恼、挣扎?如果是,那么恭喜你,消息服务让你可以很轻松地解决这些问题。 消息服务擅长于解决多系统、异构系统间的数据交换(消息通知/通讯)问题,你...

  • RabbitMQ 实现分布式事务 通俗理解 简单易学(思想)

    一. 什么是分布式事务 ...所以我们在微服务架构开发的时候,一定要处理好分布式事务 二. 分布式事务理论 比较流行的就是BASE,CAP定理 CAP理论: 是由加州大学伯克利分校Eric Brewer教授提出来的,他指...

  • 【Spring Cloud + RabbitMQ 实现分布式消息总线】—— 每天一点小知识

    通过完成上述步骤,你可以结合 Spring Cloud 和 RabbitMQ 实现配置刷新、事件广播、服务监控以及微服务间通信的功能。这些功能可以提供更强大的分布式系统能力,并帮助实现解耦、异步处理和实时监控的目标。

  • RabbitMQ 高级:分布式事务简述(一)

    例如在下单场景下,订单服务和配送中心如果不在同一个节点上,就涉及分布式事务。 功能描述 订单和配送中心两个服务之间是独立的,现在要把它们变为一个“ 整体 ”:用户下单,订单系统完成订单创建,再远程...

  • RabbitMQ分布式集群架构

    RabbitMQ分布式集群架构和高可用性(HA) (一) 功能和原理 设计集群的目的 允许消费者和生产者在RabbitMQ节点崩溃的情况下继续运行 通过增加更多的节点来扩展消息通信的吞吐量 1 集群配置方式 ...

  • 基于java网上球鞋竞拍系统设计与实现.docx

    基于java网上球鞋竞拍系统设计与实现.docx

  • 基于bert实现关系三元组抽取python源码+数据集+项目说明.zip

    基于bert实现关系三元组抽取python源码+数据集+项目说明.zip基于bert实现关系三元组抽取python源码+数据集+项目说明.zip基于bert实现关系三元组抽取python源码+数据集+项目说明.zip基于bert实现关系三元组抽取python源码+数据集+项目说明.zip基于bert实现关系三元组抽取python源码+数据集+项目说明.zip 个人大四的毕业设计、课程设计、作业、经导师指导并认可通过的高分设计项目,评审平均分达96.5分。主要针对计算机相关专业的正在做毕设的学生和需要项目实战练习的学习者,也可作为课程设计、期末大作业。 [资源说明] 不懂运行,下载完可以私聊问,可远程教学 该资源内项目源码是个人的毕设或者课设、作业,代码都测试ok,都是运行成功后才上传资源,答辩评审平均分达到96.5分,放心下载使用! 1、该资源内项目代码都经过测试运行成功,功能ok的情况下才上传的,请放心下载使用! 2、本项目适合计算机相关专业(如计科、人工智能、通信工程、自动化、电子信息等)的在校学生、老师或者企业员工下载学习,也适合小白学习进阶,当然也可作为毕设项目、课程设计、作业、项目初期立项演示等。 3、如果基础还行,也可在此代码基础上进行修改,以实现其他功能,也可用于毕设、课设、作业等。 下载后请首先打开README.md文件(如有),供学习参考。

  • 基于java的足球赛会管理系统设计与实现.docx

    基于java的足球赛会管理系统设计与实现.docx

  • 基于java的婚纱摄影网的设计与实现.docx

    基于java的婚纱摄影网的设计与实现.docx

  • 基于安卓的美颜相机,可以通过opencv加滤镜,并调整亮度和对比度,可以磨皮,但并不能瘦脸,磨皮时非常卡顿,暂无解决方法.zip

    项目工程资源经过严格测试可直接运行成功且功能正常的情况才上传,可轻松复刻,拿到资料包后可轻松复现出一样的项目,本人系统开发经验充足(全领域),有任何使用问题欢迎随时与我联系,我会及时为您解惑,提供帮助。 【资源内容】:包含完整源码+工程文件+说明(如有)等。答辩评审平均分达到96分,放心下载使用!可轻松复现,设计报告也可借鉴此项目,该资源内项目代码都经过测试运行成功,功能ok的情况下才上传的。 【提供帮助】:有任何使用问题欢迎随时与我联系,我会及时解答解惑,提供帮助 【附带帮助】:若还需要相关开发工具、学习资料等,我会提供帮助,提供资料,鼓励学习进步 【项目价值】:可用在相关项目设计中,皆可应用在项目、毕业设计、课程设计、期末/期中/大作业、工程实训、大创等学科竞赛比赛、初期项目立项、学习/练手等方面,可借鉴此优质项目实现复刻,设计报告也可借鉴此项目,也可基于此项目来扩展开发出更多功能 下载后请首先打开README文件(如有),项目工程可直接复现复刻,如果基础还行,也可在此程序基础上进行修改,以实现其它功能。供开源学习/技术交流/学习参考,勿用于商业用途。质量优质,放心下载使用

  • 基于java的农产品仓库管理系统系统设计与实现.docx

    基于java的农产品仓库管理系统系统设计与实现.docx

  • 基于Java swing +mysql(Oracle)实现的飞机订票系统项目(含毕业论文+答辩 ppt+双数据库版本源码+图)

    【作品名称】:基于Java swing +mysql(Oracle)实现的飞机订票系统项目(含毕业论文+答辩 ppt+双数据库版本源码) 【适用人群】:适用于希望学习不同技术领域的小白或进阶学习者。可作为毕设项目、课程设计、大作业、工程实训或初期项目立项。 【项目介绍】: 系统功能需求 本系统用于远程机票预订,包括远程航班信息查询、机票预订与确认等;主要分为四大功能:查询、订票、退票和管理。 管理员登录、注销 到系统并进行插入、删除、更新以及查看机票后台数据库操作 插入:机票的插入可以按照航班号、班期、公司、座位号、起飞地以及抵达地等等插入数据库。 删除:机票可以按照航班号、起止城市、星期进行删除 3.1.1客户端系统功能 1.普通用户: 查询:根据航班号、航空公司以及目的地查询出票类信息 订票: 根据出发日期和第一航班号预订机票,机票类型分为单 【资源声明】:本资源作为“参考资料”而不是“定制需求”,代码只能作为参考,不能完全复制照搬。不一定能够满足所有人的需求,需要有一定的基础能够看懂代码,能够自行调试代码并解决报错,能够自行添加功能修改代码。

Global site tag (gtag.js) - Google Analytics