`
flyingdutchman
  • 浏览: 358955 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

RabbitMQ:镜像队列Mirrored queue

阅读更多
        在上一节《RabbitMQ集群类型一:在单节点上构建built-in内置集群》中我们已经学习过:在集群环境中,队列只有元数据会在集群的所有节点同步,但队列中的数据只会存在于一个节点,数据没有冗余且容易丢,甚至在durable的情况下,如果所在的服务器节点宕机,就要等待节点恢复才能继续提供消息服务。
        那么是不是有消息冗余的解决方案呢?这就要提到从RabbitMQ 2.6.0版本开始提供支持 镜像队列(Mirrored Queue),消息会在rabbitmq节点之间的被创建为“镜像队列”的队列之间复制。和其它的主从设计一样,镜像队列也有master和slave的概念,一旦某个节点当掉,会在其余的节点中选举一个slave作为master。

        在镜像队列(Mirrored Queue)中,只有master的copy对外提供服务,而其他slave copy只提供备份服务,在master copy所在节点不可用时,选出一个slave copy作为新的master继续对外提供服务。

        之前我们也将过,rabbitmq会同过一种“选举”机制在余下的所有的salve中选出一个

        理想总是简单,真实却总是复杂。如果我们要在运行时添加一个新的节点到集群中,消息复制会怎么处理?如果有新节点加入,RabbitMQ不会同步之前的历史数据,新结点只会复制该结点加入到集群之后新增消息.这里的假设是随着消息的被consumer取走,最终所有的节点的数据都会对齐一致。
        接下来,一个自然的追问就“诞生”了:既然master节点退出集群会选一个slave作为master,那么如果不幸选中了一个刚刚加入集群的节点怎么办?那消息不就丢了吗!?这里您可以把心放到您的肚子里,RabbitMQ集群内部会维护节点的状态是否已经同步,使用rabbitmqctl的synchronised_slave_pids参数,就可以查看状态.如果slave_pids和synchronised_slave_pids里面的节点是一致的,那说明全都同步了.如果不一致很容易比较出来哪些还没有同步,集群只会在“最老”的slave节点之间选一个出来作为新的master节点。

        镜像队列分为两种:集群内全节点复制的镜像队列和集群内局部节点复制的镜像队列。先面我们就看一下如何创建镜像队列Mirrored Queue:其实很简单,只要在创建消息队列时,添加一个叫“x-ha-policy”的key/value对就可以了:
引用

        //创建集群内全节点复制的镜像队列
        ...
        queue_args = {'x-ha-policy' : 'all'}
        channel.queue_declare(queue = 'hello-queue',arguments = queue_args)
        ...


        //创建集群内局部节点复制的镜像队列
        ...
        queue_args = {'x-ha-policy' : 'nodes',
                      'x-ha-policy-params' : [rabbit@JackyChen,rabbit3@JackyChen]}
        channel.queue_declare(queue = 'hello-queue',arguments = queue_args)
        ...

       


        下面我们通过实际的python代码并执行相关的操作来验证一下镜像队列:
引用

        # mkdir -p /data/rabbitmq-pika/c5
        # cd /data/rabbitmq-pika/c5
        # touch hello_world_mirrored_producer.py
        # chmod +x hello_world_mirrored_producer.py
        # touch hello_world_mirrored_consumer.py
        # chmod +x hello_world_mirrored_consumer.py

       

        其中hello_world_mirrored_producer.py代码如下:
引用

#!/usr/bin/env python
#coding=utf-8

import pika,sys
from pika import spec

#在"/"虚拟主机vhost上通过用户guest建立channel通道
user_name = 'guest'
user_passwd = 'guest'
target_host = 'JackyChen'
vhost = '/'
cred = pika.PlainCredentials(user_name,user_passwd)
conn_params = pika.ConnectionParameters(target_host,
                                        virtual_host = vhost,
                                        credentials = cred)
conn_broker = pika.BlockingConnection(conn_params)
channel = conn_broker.channel()

#创建一个direct类型的、持久化的、没有consumer时队列是否自动删除的exchage交换机
channel.exchange_declare(exchange = 'hello-exch',
                         type = 'direct',    
                         passive = False,    
                         durable = True,    
                         auto_delete = False)    
                        
#使用接收到的信息创建消息
msg = sys.argv[1]
msg_props = pika.BasicProperties()
msg_props.content_type = 'text/plain'
#持久化消息
msg_props.delivery_mode = 2
msg_ids = []

print 'ready to publish...'
#发布消息
channel.basic_publish(body = msg,
                      exchange = 'hello-exch',
                      properties = msg_props,
                      routing_key = 'hala')
print 'published!'
msg_ids.append(len(msg_ids) + 1)
print len(msg_ids)
channel.close()
conn_broker.close()
       

        hello_world_mirrored_consumer.py代码如下:
引用

#!/usr/bin/env python
#coding=utf-8

import pika

#在"/"虚拟主机vhost上通过用户guest建立channel通道
user_name = 'guest'
user_passwd = 'guest'
target_host = 'JackyChen'
vhost = '/'
cred = pika.PlainCredentials(user_name,user_passwd)
conn_params = pika.ConnectionParameters(target_host,
                                        virtual_host = vhost,
                                        credentials = cred)
conn_broker = pika.BlockingConnection(conn_params)
conn_channel = conn_broker.channel()

#创建一个direct类型的、持久化的、没有consumer时,队列是否自动删除exchage交换机
conn_channel.exchange_declare(exchange = 'hello-exch',
                              type = 'direct',
                              passive = False, 
                              durable = True, 
                              auto_delete = False) 
 
#创建一个持久化的、没有consumer时队列是否自动删除的名为“hello-queue”
#创建集群内全节点复制的镜像队列
queue_args = {'x-ha-policy' : 'all'}
#创建集群内局部节点复制的镜像队列
#queue_args = {'x-ha-policy' : 'nodes','x-ha-policy-params' : ['rabbit@JackyChen','rabbit3@JackyChen']}
conn_channel.queue_declare(queue = 'hello-queue', 
                           durable = True, 
                           auto_delete = False,
                           arguments = queue_args)
 
#将“hello-queue”队列通过routing_key绑定到“hello-exch”交换机 
conn_channel.queue_bind(queue = 'hello-queue',
                        exchange = 'hello-exch',
                   routing_key = 'hala')
 
#定义一个消息确认函数,消费者成功处理完消息后会给队列发送一个确认信息,然后该消息会被删除
def ack_info_handler(channel,method,header,body):
    """ack_info_handler """
    print 'ack_info_handler() called!' 
    if body == 'quit':
        channel.basic_cancel(consumer_tag = 'hello-hala')
        channel.stop_sonsuming()
    else: 
        print body
        channel.basic_ack(delivery_tag = method.delivery_tag)

conn_channel.basic_consume(ack_info_handler, 
                           queue = 'hello-queue',
                           no_ack = False,
                           consumer_tag = 'hello-hala')
 
print 'ready to consume msg...'
conn_channel.start_consuming()
       


        打开rabbitmq集群中所有节点:
引用

        # /opt/mq/rabbitmq/sbin/rabbitmqctl start_app
        # /opt/mq/rabbitmq2/sbin/rabbitmqctl start_app
        # /opt/mq/rabbitmq3/sbin/rabbitmqctl start_app

       

        然后执行:
引用

        # ./hello_world_mirrored_consumer.py
        # ./hello_world_mirrored_producer.py
       



        注意:上面是rabbitmq 3.0之前的创建镜像队列的方法,3.0之后改为通过
引用

        //给所有以“hello”开头为名创建的消息队列设置为集群内全节点复制的镜像队列
        # ./rabbitmqctl set_policy ha-all2 "^hello.*" '{"ha-mode":"all"}'

       


        创建集群内局部节点复制的镜像队列:
引用

       
        # ./rabbitmqctl set_policy ha-all "^halo.*" '{"ha-mode":"nodes","ha-params":["rabbit@JackyChen","rabbit3@JackyChen"]}'

       


        另外还有一种的镜像队列:
引用

        //只指定在整个集群节点中只包含count = n 个镜像的镜像列表
        # ./rabbitmqctl set_policy ha-all3 "^alert.*" '{"ha-mode":"exactly","count":2}'

       


       

       
  • 大小: 34.5 KB
  • 大小: 110.8 KB
分享到:
评论

相关推荐

    RabbitMQ实战 高效部署分布式消息队列

    节点间的数据同步是通过镜像队列(Mirrored Queues)实现的,保证了即使某个节点故障,消息也不会丢失。 2. 负载均衡:通过交换机的类型(如Direct、Fanout、Topic、Header)和路由规则,可以将消息均匀分发到不同的...

    RabbitMQ高效部署分布式消息队列实战篇

    实战篇还将涵盖高级主题,如死信队列(Dead Letter Queue)、延迟队列(Delayed Queue)、高可用集群(Clustering)和镜像队列(Mirrored Queues)等。这些都是在生产环境中确保RabbitMQ稳定性和高可用性的关键配置...

    RabbitMQ备忘录一

    在实际应用中,我们还需要了解RabbitMQ的一些高级特性,比如死信队列(Dead Letter Exchange & Queue)、镜像队列(Mirrored Queues)以保证高可用性,以及RabbitMQ管理插件(RabbitMQ Management Console)提供的...

    RabbitMQ实战++高效部署分布式消息队列

    此外,镜像队列(Mirrored Queues)可以进一步提高数据安全性。 **三、分布式消息队列** 1. **消息一致性**:在分布式环境中,确保消息的顺序性和一致性至关重要。RabbitMQ提供了事务和确认机制来保证消息的可靠性...

    rabbitmq-server-windows-3.9.13

    2. **镜像队列(Mirrored Queues)**:在集群中,队列可以被镜像到多个节点,确保即使部分节点失效,数据也不会丢失。 3. **网络分区(Network Partitions)**:RabbitMQ能够优雅地处理网络分区,通过使用一种名为...

    rabbitmq.zip

    - **Mirrored Queues**: 队列镜像可以在集群中创建队列副本,提高数据安全性。 - ** shovel**: 用于在不同RabbitMQ实例间迁移或复制消息。 ### 6. Erlang与RabbitMQ - **Erlang OTP**: Erlang的开放电信平台,...

    rabbitmq 实战练习-rabbitmq-actual.zip

    - **Mirrored Queues**:镜像队列在集群中复制队列,确保高可用性。 - **Network Partitions (HaPartitions)**:处理网络分区情况,保持服务连续性。 7. **RabbitMQ与其他技术的集成** - **Spring Framework**:...

    RabbitMQ面试题(面试题整合)

    - **Mirrored Queues**:镜像队列,队列在集群中的多个节点有副本,保证数据安全性。 - **Sharding**:队列分片,提高处理能力。 6. **RabbitMQ的管理工具** - **RabbitMQ Management Console**:Web界面,用于...

    【面试资料】-(机构内训资料)RabbitMQ消息中间件面试题.zip

    - **Mirrored Queues**:镜像队列,队列中的消息会在集群中的所有节点上复制,保证高可用性。 6. **消费模式** - **Auto-Ack**:自动确认,消费者接收到消息后,RabbitMQ会默认认为消息已被成功处理。 - **...

    rabbitmq面试题.pdf

    RabbitMQ的镜像队列(Mirrored Queues)是什么?** - 镜像队列是指在集群内为每个队列创建一个副本,提高队列的可用性和容错能力。 **15. 如何实现RabbitMQ的高可用?** - 通过配置镜像队列、设置故障转移策略等...

    RabbitMQ安装包

    - **Mirrored Queues**: 队列可以在集群中的多个节点间镜像,增加数据安全性。 5. **监控与管理** - **RabbitMQ Management Console**: 提供了一个Web界面,可以监控节点状态、队列、用户、权限等信息。 - **...

    otp-win64-25.1.2 rabbitmq-server-3.11.2

    学习如何配置和管理RabbitMQ集群,以及了解镜像队列(Mirrored Queues)等高可用性策略,对于生产环境中的RabbitMQ部署至关重要。 5. **RabbitMQ管理工具**: RabbitMQ提供了一个Web管理界面,可以用来监控服务器...

    RabbitMQ系统升级相关1

    14. **mirrored_supervisor**:镜像监督器的更新,确保节点失效时的优雅重启。 15. **topic_trie_node**:主题 Trie 节点的管理,优化大规模主题场景下的性能。 16. **runtime_parameters**:运行时参数的更新,...

    RabbitMQ学习笔记

    - **Mirrored Queues**: 队列可以在多个节点间镜像,保证即使节点故障,数据也不会丢失。 6. **持久化与非持久化** - **Persistent Messages**: 消息可以设置为持久化,即使服务器重启也不会丢失。 - **Non-...

    rabbitmq-playground

    - **Mirrored Queues**:镜像队列,用于高可用性,队列在多个节点间复制。 在`rabbitmq-playground-main`中,你可以通过编写和运行代码来实验这些概念,加深对RabbitMQ的理解。这个环境通常包含示例代码,演示如何...

    mq消息中间件.rar

    - **镜像队列(Mirrored Queues)**:提供高可用性,队列内容在多个节点间复制,防止单点故障。 - **延迟队列(Delayed Exchange)**:允许设置消息在特定时间后才被处理。 3. **Spring家族整合** - **Spring ...

Global site tag (gtag.js) - Google Analytics