`
Technoboy
  • 浏览: 157063 次
  • 性别: Icon_minigender_1
  • 来自: 大连
社区版块
存档分类
最新评论

RocketMQ原理解析-Name Server

阅读更多
1. 介绍
    Namesrv的功能,就相当于RPC中的注册中心。对于MQ而言,broker启动,将自身创建的topic等信息注册到Namesrv上。consumer和producer需要配置namesrv的地址,启动后,首先和namesrv建立长连接,并获取相应的topic信息(比如,哪些broker有topic路由信息),然后再和broker建立长连接。Namesrv本身无状态,可集群部署。所有的注册信息,都保存在namesrv的类似map内存数据结构中。

2. 启动
    Namesrv启动后,首先会加载KVConfig服务,然后监听本地端口(默认9876),等待客户端连接,并定时清理非活跃broker和打印KVConfig值信息。代码清晰易懂。
public boolean initialize() {

        this.kvConfigManager.load();


        this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);


        this.remotingExecutor =
                Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));

        this.registerProcessor();


        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                NamesrvController.this.routeInfoManager.scanNotActiveBroker();
            }
        }, 5, 10, TimeUnit.SECONDS);

        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                NamesrvController.this.kvConfigManager.printAllPeriodically();
            }
        }, 1, 10, TimeUnit.MINUTES);

        return true;
    }


3. 请求处理
    Namesrv能处理的所有请求都在DefaultRequestProcessor#processRequest方法:
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
        if (log.isDebugEnabled()) {
            log.debug("receive request, {} {} {}",//
                    request.getCode(), //
                    RemotingHelper.parseChannelRemoteAddr(ctx.channel()), //
                    request);
        }

        switch (request.getCode()) {
            case RequestCode.PUT_KV_CONFIG:
                return this.putKVConfig(ctx, request);
            case RequestCode.GET_KV_CONFIG:
                return this.getKVConfig(ctx, request);
            case RequestCode.DELETE_KV_CONFIG:
                return this.deleteKVConfig(ctx, request);
            case RequestCode.REGISTER_BROKER:
                Version brokerVersion = MQVersion.value2Version(request.getVersion());
                if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
                    return this.registerBrokerWithFilterServer(ctx, request);
                }
                else {
                    return this.registerBroker(ctx, request);
                }
            case RequestCode.UNREGISTER_BROKER:
                return this.unregisterBroker(ctx, request);
            case RequestCode.GET_ROUTEINTO_BY_TOPIC:
                return this.getRouteInfoByTopic(ctx, request);
            case RequestCode.GET_BROKER_CLUSTER_INFO:
                return this.getBrokerClusterInfo(ctx, request);
            case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
                return this.wipeWritePermOfBroker(ctx, request);
            case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:
                return getAllTopicListFromNameserver(ctx, request);
            case RequestCode.DELETE_TOPIC_IN_NAMESRV:
                return deleteTopicInNamesrv(ctx, request);
            case RequestCode.GET_KVLIST_BY_NAMESPACE:
                return this.getKVListByNamespace(ctx, request);
            case RequestCode.GET_TOPICS_BY_CLUSTER:
                return this.getTopicsByCluster(ctx, request);
            case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS:
                return this.getSystemTopicListFromNs(ctx, request);
            case RequestCode.GET_UNIT_TOPIC_LIST:
                return this.getUnitTopicList(ctx, request);
            case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST:
                return this.getHasUnitSubTopicList(ctx, request);
            case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:
                return this.getHasUnitSubUnUnitTopicList(ctx, request);
            default:
                break;
        }
        return null;
    }

    broker启动后,会调用REGISTER_BROKER注册topic等信息,其心跳也是直接调用REGISTER_BROKER的,broker下线会调用UNREGISTER_BROKER方法。

4. 数据结构
    Namesrv的数据都保存在RouteInfoManager类中:
private final static long BrokerChannelExpiredTime = 1000 * 60 * 2;
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
    private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
    private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
    private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
    private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

    可以看到,broker非活跃时间为2分钟。

5. KVConfig作用
    猜测MQ作者,想通过KVConfig做类似configserver的简易功能,将信息配置在namesrv后,客户端获取后进行业务处理。其数据结构为:
private final HashMap<String/* Namespace */, HashMap<String/* Key */, String/* Value */>> configTable =
            new HashMap<String, HashMap<String, String>>();

    KVConfig的数据最终会保存在kvConfig.json文件中,每次的put,delete操作都会直接修改kvConfig.json文件,并在namesrv启动时,加载数据。

6. 控制台
    MQ的mqadmin脚本可以直接获取namesrv存储的信息,在MQ的bin目录下:

    这里面,最重要的一个命令为wipeWritePerm,该命令可清除指定broker的写权限。当某台broker需要重启,将会导致向这台broker发消息失败,消息消费失败。通过该命令清除broker写权限后,由于producer只会向带有写权限的master broker发消息,当producer从namesrv更新(定时30s更新topic路由信息)到该broker无写权限后,将会将消息发往其他broker,然后再根据业务情况(譬如30分钟后),让consumer消费完消息再停掉该broker。
  • 大小: 160.7 KB
分享到:
评论

相关推荐

    rocketmq-console-ng-2.0.0.jar

    rocketmq-console的源码打出来的jar包,可直接运行,默认端口为8080,如果要指定端口,将jar包里面的application.properties拿出来放在当前路径即可,如果配置文件没有填写Name Server的话,可以在启动项目时指定...

    rocketMq 的 docker-compose安装包

    RocketMQ 是一款高性能、分布式的消息中间件,常用于大规模分布式系统中的消息传递。Docker Compose 是一个用于定义和运行多容器 Docker 应用的工具,它可以简化部署和管理复杂应用的过程。在这个“rocketMq 的 ...

    rocketmq-client-nodejs:Apache RocketMQ Node.js客户端

    $ npm install --save apache-rocketmq 例子 您可以查看和以获得快速。 用法 首先需要此软件包。 const { Producer , PushConsumer } = require ( "apache-rocketmq" ) ; 制片人 建设者 new Producer ( groupId [ ,...

    rocketmq-flume:用于RocketMQ与Flume-ng之间的消息接收和投递

    HOME/lib目录中(具体包会在后面描述)SinkSink配置说明配置项必填默认值说明namesrvAddr必填nullName Server地址,遵循RocketMQ配置方式producerGroup可选DEFAULT_PRODUCERProducer分组topic必填nullTopic名称tags可...

    rocketmq-console.war 3.2.6 管控台war包

    2. `instanceName`: RocketMQ实例的名称,用于区分不同的RocketMQ服务实例。 配置完成后,重启Web容器,你就可以通过浏览器访问管控台了,一般URL格式为`http://your_server_ip:your_server_port/rocketmq-console`...

    docker-mailserver 用来自己搭建邮件服务器的 docker 镜像

    tvial/docker-mailserver邮箱服务器。可以自己在小网中搭建一个邮件服务器自己玩。启动方法 docker run --name forsaken-mail -itd -p 2255:25 -p 3000:3000 tvial/docker-mailserver:latest

    rocketMQ 4.1.0 linux

    3. **配置RocketMQ**:修改RocketMQ的配置文件,主要涉及`conf/broker.conf`,`conf/name-server.conf`等。例如,设置broker的ID、端口、日志路径等。 4. **启动RocketMQ**:使用`bin/mqnamesrv`启动NameServer,...

    使用flink-connector-sqlserver-cdc 2.3.0把数据从SQL Server实时同步到MySQL中

    本话题将详细讲解如何利用Flink的SQL Server Change Data Capture (CDC) 连接器版本2.3.0,将SQL Server中的数据实时同步到MySQL数据库。 首先,让我们了解什么是CDC。CDC是一种数据库技术,它能够捕获数据库中的...

    aws-name-server, 允许你按实例名称查找ec2实例的DNS服务器.zip

    aws-name-server, 允许你按实例名称查找ec2实例的DNS服务器 一个DNS服务器,它按名称提供你的。用法aws-name-server --domain aws.bugsnag.com --aws-region us-east-1 --aws-access-key-id &l

    rocketmq安装部署

    等待一段时间,直到控制台输出`The Name Server is running now...`,表示NameServer已经启动成功。 **5. 启动Broker** RocketMQ支持集群模式,这里我们先启动一个简单的单机Broker。 ```bash nohup sh bin/mq...

    rocketmq-flume-master:flume收集日志发送到rocketmq

    rocketmq-flume Source&Sink ...Name Server地址,遵循RocketMQ配置方式 producerGroup 可选 DEFAULT_PRODUCER Producer分组 topic 必填 null Topic名称 tags 可选 空字符串 Tag名称,遵循RocketMQ配置方式

    RocketMQ Operator-K8s平台自动化部署工具介绍.pptx

    3. **自定义资源** - 如NameService和Broker,它们分别代表RocketMQ的Name Server和Broker节点,通过定义对应的YAML配置文件进行管理。 4. **控制器** - Broker Controller和Name Service Controller协同工作,例如...

    RocketMQ Operator-K8s平台自动化部署工具介绍.pdf

    - **Name Service Controller** 通过集成RocketMQ Admin Tool,可以处理Name Server的动态扩展,并确保所有Broker能感知到变更。 ### RocketMQ-Operator基本使用 1. **环境准备** 首先需要一个Kubernetes集群...

    sqoop-sqlserver-1.0.tar.gz

    Apache Sqoop 是一个工具,主要用于在关系型数据库(如 SQL Server)和 Apache Hadoop 之间进行数据迁移。这个名为 "sqoop-sqlserver-1.0.tar.gz" 的压缩包文件,显然包含了针对 SQL Server 的特定连接器,使得 ...

    RocketMQ Operator-K8s平台自动化部署工具.pdf

    RocketMQ Operator使用CRD来定义RocketMQ集群的资源配置,包括Name Service和Broker资源配置。CRD提供了灵活的资源配置方式,用户可以根据需要定义不同的资源配置。 控制器(Controller) Controller是RocketMQ ...

    ScanZxing-master.zip

    【描述】描述中提到,这个源码包包含了"core-3.0.0.jar",这是Zxing的核心库,负责实际的条码和二维码解析工作。该版本适用于Android 5.1 (Lollipop) 及以上版本,特别是6.0 (Marshmallow),并且在5.1系统上运行良好...

    screenshot-api-server:使用node express和puppeteer搭建的WEB截图API服务 网页截图 & pdf 生成 API服务 & docker镜像

    docker run -p 3000:3000 -td --rm -v ${PWD}:/screenshot-api-server/public --name=screenshot-api-server wuxue107/screenshot-api-server 本地使用 yarn && yarn start API 接口 截图 单张图片截取 API: 请求...

    实现SqlServer数据库批量添加表注释和列注释源代码

    实现SqlServer数据库批量添加表注释和列注释源代码 配置yml类型的配置文件, 格式如下: table: - name: SysUser note: 系统用户 column: - name: UserID note: 用户表自增ID - name: UserCode note: 用户...

    数据库实战案例 - SQL server 实战题目汇总.rar

    查找所有已经分配部门的员工的last_name和first_name查找所有员工的last name和first_name以及对应部门编号dept_no查找所有员工入职时候的薪水情况查找薪水涨幅超过15次的员工号emp_no以及其对应的涨幅次数t ...

    03-05-26-RocketMQ基本原理分析1

    - **Producer**:集群部署,与Name Server建立长连接,获取Topic路由信息并连接Master发送消息。 - **Consumer**:同样组成集群,从Name Server获取路由信息,连接Master或Slave消费消息。 4. **部署与启动**: ...

Global site tag (gtag.js) - Google Analytics