1. 介绍
Namesrv的功能,就相当于RPC中的注册中心。对于MQ而言,broker启动,将自身创建的topic等信息注册到Namesrv上。consumer和producer需要配置namesrv的地址,启动后,首先和namesrv建立长连接,并获取相应的topic信息(比如,哪些broker有topic路由信息),然后再和broker建立长连接。Namesrv本身无状态,可集群部署。所有的注册信息,都保存在namesrv的类似map内存数据结构中。
2. 启动
Namesrv启动后,首先会加载KVConfig服务,然后监听本地端口(默认9876),等待客户端连接,并定时清理非活跃broker和打印KVConfig值信息。代码清晰易懂。
public boolean initialize() {
this.kvConfigManager.load();
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
this.remotingExecutor =
Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
this.registerProcessor();
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.kvConfigManager.printAllPeriodically();
}
}, 1, 10, TimeUnit.MINUTES);
return true;
}
3. 请求处理
Namesrv能处理的所有请求都在DefaultRequestProcessor#processRequest方法:
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
if (log.isDebugEnabled()) {
log.debug("receive request, {} {} {}",//
request.getCode(), //
RemotingHelper.parseChannelRemoteAddr(ctx.channel()), //
request);
}
switch (request.getCode()) {
case RequestCode.PUT_KV_CONFIG:
return this.putKVConfig(ctx, request);
case RequestCode.GET_KV_CONFIG:
return this.getKVConfig(ctx, request);
case RequestCode.DELETE_KV_CONFIG:
return this.deleteKVConfig(ctx, request);
case RequestCode.REGISTER_BROKER:
Version brokerVersion = MQVersion.value2Version(request.getVersion());
if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
return this.registerBrokerWithFilterServer(ctx, request);
}
else {
return this.registerBroker(ctx, request);
}
case RequestCode.UNREGISTER_BROKER:
return this.unregisterBroker(ctx, request);
case RequestCode.GET_ROUTEINTO_BY_TOPIC:
return this.getRouteInfoByTopic(ctx, request);
case RequestCode.GET_BROKER_CLUSTER_INFO:
return this.getBrokerClusterInfo(ctx, request);
case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
return this.wipeWritePermOfBroker(ctx, request);
case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:
return getAllTopicListFromNameserver(ctx, request);
case RequestCode.DELETE_TOPIC_IN_NAMESRV:
return deleteTopicInNamesrv(ctx, request);
case RequestCode.GET_KVLIST_BY_NAMESPACE:
return this.getKVListByNamespace(ctx, request);
case RequestCode.GET_TOPICS_BY_CLUSTER:
return this.getTopicsByCluster(ctx, request);
case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS:
return this.getSystemTopicListFromNs(ctx, request);
case RequestCode.GET_UNIT_TOPIC_LIST:
return this.getUnitTopicList(ctx, request);
case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST:
return this.getHasUnitSubTopicList(ctx, request);
case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:
return this.getHasUnitSubUnUnitTopicList(ctx, request);
default:
break;
}
return null;
}
broker启动后,会调用REGISTER_BROKER注册topic等信息,其心跳也是直接调用REGISTER_BROKER的,broker下线会调用UNREGISTER_BROKER方法。
4. 数据结构
Namesrv的数据都保存在RouteInfoManager类中:
private final static long BrokerChannelExpiredTime = 1000 * 60 * 2;
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
可以看到,broker非活跃时间为2分钟。
5. KVConfig作用
猜测MQ作者,想通过KVConfig做类似configserver的简易功能,将信息配置在namesrv后,客户端获取后进行业务处理。其数据结构为:
private final HashMap<String/* Namespace */, HashMap<String/* Key */, String/* Value */>> configTable =
new HashMap<String, HashMap<String, String>>();
KVConfig的数据最终会保存在kvConfig.json文件中,每次的put,delete操作都会直接修改kvConfig.json文件,并在namesrv启动时,加载数据。
6. 控制台
MQ的mqadmin脚本可以直接获取namesrv存储的信息,在MQ的bin目录下:
这里面,最重要的一个命令为wipeWritePerm,该命令可清除指定broker的写权限。当某台broker需要重启,将会导致向这台broker发消息失败,消息消费失败。通过该命令清除broker写权限后,由于producer只会向带有写权限的master broker发消息,当producer从namesrv更新(定时30s更新topic路由信息)到该broker无写权限后,将会将消息发往其他broker,然后再根据业务情况(譬如30分钟后),让consumer消费完消息再停掉该broker。
- 大小: 160.7 KB
分享到:
相关推荐
rocketmq-console的源码打出来的jar包,可直接运行,默认端口为8080,如果要指定端口,将jar包里面的application.properties拿出来放在当前路径即可,如果配置文件没有填写Name Server的话,可以在启动项目时指定...
RocketMQ 是一款高性能、分布式的消息中间件,常用于大规模分布式系统中的消息传递。Docker Compose 是一个用于定义和运行多容器 Docker 应用的工具,它可以简化部署和管理复杂应用的过程。在这个“rocketMq 的 ...
$ npm install --save apache-rocketmq 例子 您可以查看和以获得快速。 用法 首先需要此软件包。 const { Producer , PushConsumer } = require ( "apache-rocketmq" ) ; 制片人 建设者 new Producer ( groupId [ ,...
HOME/lib目录中(具体包会在后面描述)SinkSink配置说明配置项必填默认值说明namesrvAddr必填nullName Server地址,遵循RocketMQ配置方式producerGroup可选DEFAULT_PRODUCERProducer分组topic必填nullTopic名称tags可...
2. `instanceName`: RocketMQ实例的名称,用于区分不同的RocketMQ服务实例。 配置完成后,重启Web容器,你就可以通过浏览器访问管控台了,一般URL格式为`http://your_server_ip:your_server_port/rocketmq-console`...
tvial/docker-mailserver邮箱服务器。可以自己在小网中搭建一个邮件服务器自己玩。启动方法 docker run --name forsaken-mail -itd -p 2255:25 -p 3000:3000 tvial/docker-mailserver:latest
3. **配置RocketMQ**:修改RocketMQ的配置文件,主要涉及`conf/broker.conf`,`conf/name-server.conf`等。例如,设置broker的ID、端口、日志路径等。 4. **启动RocketMQ**:使用`bin/mqnamesrv`启动NameServer,...
本话题将详细讲解如何利用Flink的SQL Server Change Data Capture (CDC) 连接器版本2.3.0,将SQL Server中的数据实时同步到MySQL数据库。 首先,让我们了解什么是CDC。CDC是一种数据库技术,它能够捕获数据库中的...
aws-name-server, 允许你按实例名称查找ec2实例的DNS服务器 一个DNS服务器,它按名称提供你的。用法aws-name-server --domain aws.bugsnag.com --aws-region us-east-1 --aws-access-key-id &l
等待一段时间,直到控制台输出`The Name Server is running now...`,表示NameServer已经启动成功。 **5. 启动Broker** RocketMQ支持集群模式,这里我们先启动一个简单的单机Broker。 ```bash nohup sh bin/mq...
rocketmq-flume Source&Sink ...Name Server地址,遵循RocketMQ配置方式 producerGroup 可选 DEFAULT_PRODUCER Producer分组 topic 必填 null Topic名称 tags 可选 空字符串 Tag名称,遵循RocketMQ配置方式
3. **自定义资源** - 如NameService和Broker,它们分别代表RocketMQ的Name Server和Broker节点,通过定义对应的YAML配置文件进行管理。 4. **控制器** - Broker Controller和Name Service Controller协同工作,例如...
- **Name Service Controller** 通过集成RocketMQ Admin Tool,可以处理Name Server的动态扩展,并确保所有Broker能感知到变更。 ### RocketMQ-Operator基本使用 1. **环境准备** 首先需要一个Kubernetes集群...
Apache Sqoop 是一个工具,主要用于在关系型数据库(如 SQL Server)和 Apache Hadoop 之间进行数据迁移。这个名为 "sqoop-sqlserver-1.0.tar.gz" 的压缩包文件,显然包含了针对 SQL Server 的特定连接器,使得 ...
RocketMQ Operator使用CRD来定义RocketMQ集群的资源配置,包括Name Service和Broker资源配置。CRD提供了灵活的资源配置方式,用户可以根据需要定义不同的资源配置。 控制器(Controller) Controller是RocketMQ ...
【描述】描述中提到,这个源码包包含了"core-3.0.0.jar",这是Zxing的核心库,负责实际的条码和二维码解析工作。该版本适用于Android 5.1 (Lollipop) 及以上版本,特别是6.0 (Marshmallow),并且在5.1系统上运行良好...
docker run -p 3000:3000 -td --rm -v ${PWD}:/screenshot-api-server/public --name=screenshot-api-server wuxue107/screenshot-api-server 本地使用 yarn && yarn start API 接口 截图 单张图片截取 API: 请求...
实现SqlServer数据库批量添加表注释和列注释源代码 配置yml类型的配置文件, 格式如下: table: - name: SysUser note: 系统用户 column: - name: UserID note: 用户表自增ID - name: UserCode note: 用户...
查找所有已经分配部门的员工的last_name和first_name查找所有员工的last name和first_name以及对应部门编号dept_no查找所有员工入职时候的薪水情况查找薪水涨幅超过15次的员工号emp_no以及其对应的涨幅次数t ...
- **Producer**:集群部署,与Name Server建立长连接,获取Topic路由信息并连接Master发送消息。 - **Consumer**:同样组成集群,从Name Server获取路由信息,连接Master或Slave消费消息。 4. **部署与启动**: ...