`
manzhizhen
  • 浏览: 293381 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

RocketMQ初探一:NameServer的作用

阅读更多

第一次真正接触Java消息服务是在2013年底,当时是给中国移动做统一支付平台,当时用的就是著名的Apache ActiveMQ,当时觉得很有趣,一个服务队列竟然可以玩出这么多花样来。当时为了尽快的入门,还把《Java Message Service》给看了一遍,这对于初学者的我收获颇多。我说知道的完全实现JMS规范的MOMActiveMQ/ApolloHornetQ,都是采用Java实现。JMS中说明了Java消息服务的两种消息传送模型,即P2P(点对点)和Pub/Sub(发布订阅),在约定了一些消息服务特性的同时,并提供了一套接口API,是否实现了该API,标志着MOM是否支持JMS规范,JMS规范中定义了消息服务诸多特性,这些特性和他所面对的企业级服务场景相关,当然,这也严重限制了消息服务的吞吐量,完全实现JMS规范的MOM,性能总不会太高,而且JMS规范中没有涉及消息服务的分布式特性,导致大多数实现JMS规范的MOM分布式部署功能比较弱,只适合集群部署。

 

说到高性能消息中间件,第一个想到的肯定是LinkedIn开源的Kafka,虽然最初Kafka是为日志传输而生,但也非常适合互联网公司消息服务的应用场景,他们不要求数据实时的强一致性(事务),更多是希望达到数据的最终一致性。RocketMQMetaQ3.0版本,而MetaQ最初的设计又参考了Kafka。最初的MetaQ 1.x版本由阿里的原作者庄晓丹开发,后面的MetaQ 2.x版本才进行了开源,这里需要注意一点的事,MetaQ 1.xMetaQ 2.x是依赖ZooKeeper的,但RocketMQ(即MetaQ 3.x)却去掉了ZooKeeper依赖,转而采用自己的NameServer

 

ZooKeeper是著名的分布式协作框架,提供了Master选举、分布式锁、数据的发布和订阅等诸多功能,为什么RocketMQ没有选择ZooKeeper,而是自己开发了NameServer,我们来具体看看NameServerRocketMQ集群中的作用就明了了。

 

RocketMQBroker有三种集群部署方式:1.单台Master部署;2.多台Master部署;3.MasterSlave部署;采用第3种部署方式时,MasterSlave可以采用同步复制和异步复制两种方式。下图是第3种部署方式的简单图:

 


 

 

 

图虽然是网上找的,但也足以说明问题,当采用多Master方式时,MasterMaster之间是不需要知道彼此的,这样的设计直接降低了Broker实现的复查性,你可以试想,如果MasterMaster之间需要知道彼此的存在,这会需要在Master之中维护一个网络的Master列表,而且必然设计到Master发现和活跃Master数量变更等诸多状态更新问题,所以最简单也最可靠的做法就是Master只做好自己的事情(比如和Slave进行数据同步)即可,这样,在分布式环境中,某台Master宕机或上线,不会对其他Master造成任何影响。

 

那么怎么才能知道网络中有多少台MasterSlave呢?你会很自然想到用ZooKeeper,每个活跃的MasterSlave都去约定的ZooKeeper节点下注册一个状态节点,但RocketMQ没有使用ZooKeeper,所以这件事就交给了NameServer来做了(看上图)。

 

结论一:NameServer用来保存活跃的broker列表,包括MasterSlave

当然,这个结论百度一查就知道,我们移步到rocketmq-namesrv模块中最重要的一个类:RouteInfoManager,它的主要属性如下:

 

private final ReadWriteLock lock = new ReentrantReadWriteLock();

private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;

private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;

private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;

private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;

private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

 

每个属性通过名字就能清楚的知道是什么意思,之所以能用非线程安全的HashMap,是因为有读写锁lock来对HashMap的修改做保护。我们注意到保存brokerMap有两个,即brokerAddrTable用来保存所有的broker列表和brokerLiveTable用来保存当前活跃的broker列表,而BrokerData用来保存broker的主要新增,而BrokerLiveInfo只用来保存上次更新(心跳)时间,我们可以直接看看RouteInfoManager中扫描非活跃broker的方法:

 

// Broker Channel两分钟过期

private final static long BrokerChannelExpiredTime = 1000 * 60 * 2;

public void scanNotActiveBroker() {

    Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();

    while (it.hasNext()) {

        Entry<String, BrokerLiveInfo> next = it.next();

        long last = next.getValue().getLastUpdateTimestamp();

        if ((last + BrokerChannelExpiredTime) < System.currentTimeMillis()) {

            RemotingUtil.closeChannel(next.getValue().getChannel());

            it.remove();

            log.warn("The broker channel expired, {} {}ms", next.getKey(), BrokerChannelExpiredTime);

            this.onChannelDestroy(next.getKey(), next.getValue().getChannel());

        }

    }

}

 

可以看出,如果两分钟内都没收到一个broker的心跳数据,则直接将其从brokerLiveTable中移除,注意,这还会导致该brokerbrokerAddrTable被删除,当然,如果该brokerMaster,则它的所有Slavebroker都将被删除。具体细节可以参看RouteInfoManageronChannelDestroy方法。

 

结论二:NameServer用来保存所有topic和该topic所有队列的列表。

我们注意到,topicQueueTablevalueQueueDataList,我们看看QueueData中的属性:

 

private String brokerName;  // broker的名称

private int readQueueNums;  // 读队列数量

private int writeQueueNums; // 写队列数量

private int perm;           // 读写权限

private int topicSynFlag;   // 同步复制还是异步复制标记

 

所以,你几乎可以在NameServer这里知道topic相关的所有信息,包括topic有哪些队列,这些队列在那些broker上等。

 

结论三:NameServer用来保存所有brokerFilter列表。

关于这一点,讨论broker的时候再细说。

 

DefaultRequestProcessorNameServer的默认请求处理器,他处理了定义在rocketmq-common模块中RequestCode定义的部分请求,比如注册broker、注销broker、获取topic路由、删除topic、获取brokertopic权限、获取NameServer的所有topic等。

 

在源代码中,NettyServerConfig类记录NameServer中的一些默认参数,比如端口、服务端线程数等,列出如下:

private int listenPort = 8888;

private int serverWorkerThreads = 8;

private int serverCallbackExecutorThreads = 0;

private int serverSelectorThreads = 3;

private int serverOnewaySemaphoreValue = 256;

private int serverAsyncSemaphoreValue = 64;

private int serverChannelMaxIdleTimeSeconds = 120;

 

这些都可以通过启动时指定配置文件来进行覆盖修改,具体可以参考NameServer的启动类NamesrvStartup的实现(没想到Apache还有对命令行提供支持的commons-cls的包)。

 

 

现在我们再回过头来看看RocketMQ为什么不使用ZooKeeperZooKeeper可以提供Master选举功能,比如Kafka用来给每个分区选一个broker作为leader,但对于RocketMQ来说,topic的数据在每个Master上是对等的,没有哪个Master上有topic上的全部数据,所以这里选举leader没有意义;RockeqMQ集群中,需要有构件来处理一些通用数据,比如broker列表,broker刷新时间,虽然ZooKeeper也能存放数据,并有一致性保证,但处理数据之间的一些逻辑关系却比较麻烦,而且数据的逻辑解析操作得交给ZooKeeper客户端来做,如果有多种角色的客户端存在,自己解析多级数据确实是个麻烦事情;既然RocketMQ集群中没有用到ZooKeeper的一些重量级的功能,只是使用ZooKeeper的数据一致性和发布订阅的话,与其依赖重量级的ZooKeeper,还不如写个轻量级的NameServerNameServer也可以集群部署,NameServerNameServer之间无任何信息同步,只有一千多行代码的NameServer稳定性肯定高于ZooKeeper,占用的系统资源也可以忽略不计,何乐而不为?当然,这些只是本人的一点理解,具体原因当然得RocketMQ设计和开发者来说。

  • 大小: 55.8 KB
分享到:
评论
3 楼 manzhizhen 2016-12-08  
styletang 写道
当然,如果该broker是Master,则它的所有Slave的broker都将被删除

这个的代码具体是在哪 能帮忙粘贴下吗?并没有看到是master会删除slave啊


  it.remove();
2 楼 styletang 2016-11-17  
当然,如果该broker是Master,则它的所有Slave的broker都将被删除

这个的代码具体是在哪 能帮忙粘贴下吗?并没有看到是master会删除slave啊
1 楼 街头诗人 2016-09-14  
持续跟踪关注博主关于MQ的动态

相关推荐

    RocketMQ概念 producer:生产者,消息发送者

    8. **消息不丢失策略**:RocketMQ通过生产者、存储和消费三个阶段的机制防止消息丢失: - 发送阶段:通过ack确认和重试策略。 - 存储阶段:异步刷盘可改为同步,配合磁盘阵列保证数据安全。 - 消费阶段:消费者...

    RocketMQ高级原理:深入剖析消息系统的核心机制(超详细整理讲解,值得收藏)

    RocketMQ是阿里巴巴开源的一款分布式消息中间件,广泛应用于大规模分布式系统中的消息传递。本文将深入剖析RocketMQ的核心机制,帮助读者理解其高级原理。 一、RocketMQ架构 RocketMQ采用主从架构,主要包括...

    启动脚本:nameServer+broker+dashboard

    启动脚本:nameServer+broker+dashboard。 改脚本可同时启动三个组件: 1.nameServer用于服务注册和发现; 2.broker用来存储客户端发送过来的消息数据; 3.dashboard方便用户在页面上浏览数据,并对数据进行管理。

    rocketmq-all-4.9.3-source

    - 消息顺序:RocketMQ支持消息的顺序传输,尤其在同一个Topic的同一个分区内的消息,可以保持严格的顺序。 - 消息回溯:Consumer可以设置消费位点,实现消息的回溯,方便排查问题。 - 容错机制:RocketMQ提供了消息...

    RocketMQ部署

    变量名:ROCKETMQ_HOME 变量值:MQ解压路径\MQ文件夹名 ###4、启动NAMESERVER Cmd命令框执行进入至‘MQ文件夹\bin’下,然后执行‘start mqnamesrv.cmd’,启动NAMESERVER。成功后会弹出提示框,此框勿关闭。 ###...

    rocketMQ(原版操作手册)

    1. **安装与部署**:RocketMQ的部署环境通常包括NameServer、Broker、Producer和Consumer四个角色。NameServer是服务发现和路由管理的角色,Broker负责存储消息,Producer负责发送消息,而Consumer则负责消费消息。...

    RocketMQ技术内幕.rar

    RocketMQ作为一款高效、稳定、可扩展的消息队列服务,广泛应用于大数据处理、实时计算、微服务架构等领域。以下是根据书名和描述所涵盖的一些关键知识点的详细说明: 一、RocketMQ简介 RocketMQ起源于阿里巴巴内部...

    rocketMq 的 docker-compose安装包

    nameserver: image: apache/rocketmq:4.9.3 command: sh bin/mqnamesrv ports: - "9876:9876" broker: image: apache/rocketmq:4.9.3 command: sh bin/mqbroker -c conf/broker.conf volumes: - ./conf/...

    Rocketmq_Chat:Android使用开源的RocketMq构建聊天应用,后台的代码后续会开源

    - **NameServer**:RocketMQ的服务注册与发现组件,负责管理所有Broker的信息。 - **Broker**:消息存储和转发的节点。 2. **Android集成RocketMQ:** - 使用RocketMQ Android SDK,进行环境配置和依赖引入。 -...

    RocketMQ集群安装部署

    通常,一个RocketMQ集群包括NameServer节点和Broker节点。NameServer负责管理和路由,而Broker负责消息的收发。在这个示例中,我们有三台机器和相应的RocketMQ角色分配: - 第一台机器(***.***.**.***)包含...

    RocketMQ最全介绍与实战.pdf

    RocketMQ 是阿里巴巴中间件团队自研的一款高性能、高吞吐量、低延迟、高可用、高可靠(具备金融级稳定性)的分布式消息中间件。RocketMQ 的前世今生是 Metaq,Metaq 在阿里巴巴集团内部、蚂蚁金服、菜鸟等各业务中...

    JAVA架构RocketMQ单机环境搭建pdf+架构师视频资源

    Java架构中的RocketMQ是一款由阿里巴巴开源的分布式消息中间件,广泛应用于大数据实时处理、订单系统、支付系统等场景。本文将详细介绍如何搭建RocketMQ的单机环境,并结合提供的PDF文档和架构师视频资源,深入理解...

    rocketMq-linux安装包

    RocketMQ 是一款高性能、分布式的消息中间件,尤其适用于大规模分布式系统中的消息传递。它由阿里巴巴开源,现已成为 Apache 的顶级项目。RocketMQ 提供了高可靠性和低延迟的消息传输服务,支持发布/订阅模型和点对...

    RocketMQ+Spring Cloud Stream环境搭建

    - **事务消息**:RocketMQ 支持事务消息,确保消息发送与业务操作的一致性。 - **消息回溯**:通过消费进度管理,可以实现消息的回溯,解决漏收问题。 - **消息重试与死信队列**:配置重试策略,当消息处理失败时,...

    深入探索RocketMQ源码:透视消息中间件的内核机制

    RocketMQ,作为一款高性能、高可用的消息中间件,被广泛应用于分布式系统设计中。本文将带领读者深入其源码,理解其核心组件与功能,从而更好地掌握RocketMQ在实际项目中的应用。 首先,我们从源码环境的搭建开始。...

    宝塔部署RocketMQ+可视化面板

    1. **安装Java环境**:RocketMQ运行需要Java环境,从提供的`jdk1.8.0_121`文件来看,我们需要先在宝塔环境中安装这个版本的JDK。这通常涉及到上传文件、解压、配置环境变量等步骤。 2. **下载RocketMQ**:从Apache...

    spring-boot-rocketmq-starter:适用于Apache RocketMQ的Spring Boot Starter

    Spring Boot RocketMQ入门 适用于Apache RocketMQ的开源Spring Boot Starter,可轻松使用RocketMQ进行开发。 快速开始 Maven依赖 &lt; groupId&gt;io.github.rhwayfun ...spring.rocketmq.nameServer =localhost

    RocketMQ实践:确保消息不丢失与顺序性的高效策略

    - **NameServer挂了的处理**:RocketMQ的NameServer是无状态的,集群部署可以防止单点故障,即使某个NameServer挂掉,其他NameServer仍能提供服务,保证消息传递不中断。 二、确保消息顺序 1. 保证消息有序的原因...

    RocketMQ 5.2.0

    Apache RocketMQ 是一款开源的分布式消息中间件,主要设计用于处理大规模实时数据传输。在5.2.0版本中,它提供了一系列优化和增强的功能,使其在高并发、低延迟、高可用性和可扩展性方面表现更加出色。本篇文章将...

    rocketmq集群搭建资源以及搭建详细步骤(全套)

    RocketMQ是一款开源的消息中间件,由阿里巴巴开发并贡献给Apache基金会,它被广泛应用于分布式系统中的消息传递,提供高可用、高可靠的消息传输服务。在本压缩包中,你将找到搭建RocketMQ集群所需的全部资源,包括...

Global site tag (gtag.js) - Google Analytics