`
jameswxx
  • 浏览: 776375 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

RocketMQ topic路由

 
阅读更多

 

原创文章,转载请注明出处:http://jameswxx.iteye.com/blog/2096446
这里以消费者为例说明。一组消费者要消费某个topic,得先知道该topic分布在哪些broker上,某个broker上的topic分布可能会变化,一旦变化,生产者和消费者应该都能被通知到。通知模式有推和拉两种,客户端都是采取拉的模式,所以broker如有变化,通知都是有延迟的。
 
一 什么时候启动topic路由获取任务
两个地方:
1 首先是DefaultMQPushConsumerImpl启动时,见DefaultMQPushConsumerImpl的start方法里的this .updateTopicSubscribeInfoWhenSubscriptionChanged();
2 另外DefaultMQPushConsumerImpl的start方法也启动了MQClientInstance,MQClientInstance的start方法里调用了startScheduledTask()方法,该方法启动了获取路由的定时任务。
        // 定时从Name Server获取Topic路由信息
        this.scheduledExecutorService .scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    MQClientInstance.this .updateTopicRouteInfoFromNameServer();
                }
                catch (Exception e) {
                    log.error( "ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
                }
            }
        }, 10, this.clientConfig .getPollNameServerInteval(), TimeUnit.MILLISECONDS );
 
 
二 每隔多久获取一次
很简单,看定时任务每隔多久执行一次就知道了,这里的间隔参数是this.clientConfig .getPollNameServerInteval()。
ClientConfig的pollNameServerInteval 定义如下:
private int pollNameServerInteval = 1000 * 30;
DefaultMQPushConsumer继承了ClientConfig,pollNameServerInteval 默认是30秒,显然,这个时间是可以自己定义的,通过DefaultMQPushConsumer的setPollNameServerInteval()方法。
 
三 获取路由过程
MQClientInstance的updateTopicRouteInfoFromNameServer()方法,该方法最终会调用下面这个方法,需要注意,对于消费者而言,isDefault参数永远是false。
  public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,DefaultMQProducer defaultMQProducer) {
        try {
            if (this.lockNamesrv .tryLock(LockTimeoutMillis, TimeUnit.MILLISECONDS )) {
                try {
                    TopicRouteData topicRouteData;
                    if (isDefault && defaultMQProducer != null) {
                       //此处省略不必要的信息,对于消费者,分支不会走到这里来,因为isDefault为false,且生产者肯定为空
                    }
                    else {
                        topicRouteData =
                                this.mQClientAPIImpl .getTopicRouteInfoFromNameServer(topic, 1000 * 3);
                    }
                    //此处省略无关语句
                }
                catch (Exception e) {
                    if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX )
                            && !topic.equals(MixAll.DEFAULT_TOPIC )) {
                        log.warn("updateTopicRouteInfoFromNameServer Exception" , e);
                    }
                }
                finally {
                    this.lockNamesrv .unlock();
                }
            }
            else {
                log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms", LockTimeoutMillis);
            }
        }
        catch (InterruptedException e) {
            log.warn( "updateTopicRouteInfoFromNameServer Exception", e);
        }
 
        return false ;
  }
 
其实最终都是通过this .mQClientAPIImpl .getTopicRouteInfoFromNameServer(topic, 1000 * 3);得到的。
 
 
 
四 客户端与nameserver的连接关系
broker与所有nameserver都是长连接,如有变化,则向所有nameserver都发送消息。但是生产者和消费者只是跟某一台nameserver保持联系。设定一个场景,如果某个broker的topic配置发生了变化,它向所有nameserver发布通知,但是此时如果某一台nameserver推送失败(超时或者挂掉了),则nameserver集群之间的信息是不完整的,因为挂掉的那台nameserver没有得到最新变化。
由此衍生三个问题:
1 如果该nameserver不是挂掉,只是那一瞬间没有响应,那么待可正常服务时,刚才那个borker发生的变化应该能生效,不应该被丢弃,否则nameserver之间的数据是不同步的。
  解决方案:broker是定时向所有nameserver发送自己的注册信息的,如果当时某台nameserver挂掉重启或者超时,没关系,下次仍然会接受到上次没接收到的broker信息
2 如果真的挂掉了,但是很快又恢复了,因为borker和nameserver保持的是长连接,显然挂掉重新启动后,broker与nameserver的长连接无效了,应该能自动重连
  getAndCreateChannel方法分析
3 只要某个nameserver不可用,消费者应该能failover,每次应该都检查长连接是否还有效,若无效则自动连接其他nameserver。
  getAndCreateNameserverChannel()方法分析
 
带着这个疑问,看看this .mQClientAPIImpl .getTopicRouteInfoFromNameServer(topic, 1000 * 3)方法。这个方法向nameserver发起调用,获取路由结果
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode. GET_ALL_TOPIC_LIST_FROM_NAMESERVER null);
RemotingCommand response = this .remotingClient .invokeSync( null, request, timeoutMillis);
 
重点在于remotingClient .invokeSync方法,如下
@Override
    public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
            throws InterruptedException, RemotingConnectException, RemotingSendRequestException,
            RemotingTimeoutException {
        //这里获取连接,该方法里面会做连接的检查和恢复
        final Channel channel = this .getAndCreateChannel(addr);
 
        //最后如果还是不是有效连接,则关闭连接,抛出异常
        if (channel != null && channel.isActive()) {
            try {
                if (this .rpcHook != null) {
                    this .rpcHook .doBeforeRequest(addr, request);
                }
                RemotingCommand response = this .invokeSyncImpl(channel, request, timeoutMillis);
                if (this .rpcHook != null) {
                    this .rpcHook .doAfterResponse(request, response);
                }
                return response;
            }
            catch (RemotingSendRequestException e) {
                log .warn("invokeSync: send request exception, so close the channel[{}]", addr);
                this .closeChannel(addr, channel);
                throw e;
            }
            catch (RemotingTimeoutException e) {
                log .warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
                // 超时异常如果关闭连接可能会产生连锁反应
                // this.closeChannel( addr, channel);
                throw e;
            }
        }
        else {
            this .closeChannel(addr, channel);
            throw new RemotingConnectException(addr);
        }
    }
 
这个方法大体分为两步,第一步获取连接,第二步通过连接发送请求,获取连接当然是getAndCreateChannel方法了,getAndCreateChannel方法非常重要,它包含了客户端对nameserver的failover,也包含了自动重连功能,对于客户端,传入的addr参数都是null,所以一直会走到getAndCreateNameserverChannel()方法。
   private Channel getAndCreateChannel( final String addr) throws InterruptedException {
        //无论是producer还是consumer,传进来的addr参数都是null
        if (null == addr)
            return getAndCreateNameserverChannel();
 
        //因为客户端传入的addr是null,所以客户端不会走到这里来,只有broker才会走到这里来,因为broker传入的addr不为null
        ChannelWrapper cw = this .channelTables .get(addr);
        if (cw != null && cw.isOK()) {
            return cw.getChannel();
        }
 
        //注意,如果和某个addr的连接不OK了,则再向该nameserver发起重连
        return this .createChannel(addr);
    }
 
createChannel方法很简单,无非就是创建连接嘛,就不细看了,分析下getAndCreateNameserverChannel(),以下是该方法大致过程:
因为客户端都是与某一台nameserver长连接,因此长连接一旦选定,后面不会变化,除非nameserver挂掉,所以已建立的长连接要保存起来。下面这段逻辑就是如此。
       String addr = this .namesrvAddrChoosed .get();
        if (addr != null) {
            ChannelWrapper cw = this .channelTables .get(addr);
             //注意这里,虽然长连接已经建立了,但是每次调用时,仍然要通过“cw != null && cw.isOK()”检查连接是否OK。
             if (cw != null && cw.isOK()) {
                return cw.getChannel();
            }
        }
 
如果连接没有建立或连接已经断开,则继续往下,真正创建连接时需要加锁的
 if ( this.lockNamesrvChannel .tryLock(LockTimeoutMillis, TimeUnit.MILLISECONDS ))
下面的代码都是在这个if块里面
这里又执行了一边上面的获取连接并检测的代码,可以连接,因为有时候连接只是偶尔不OK的
     addr = this. namesrvAddrChoosed .get();
                if (addr != null) {
                    ChannelWrapper cw = this .channelTables .get(addr);
                    if (cw != null && cw.isOK()) {
                        return cw.getChannel();
                    }
                }
 
接着往下,这段代码非常重要
namesrvIndex指示了当前跟哪个nameserver发生连接,初始值是个随机数,跟nameserver数量取模,走到这一步,要么是首次发起调用,之前连接还未创建现在要创建了,或者是已创建的连接无效了要连接下一个nameserver,就是“cw.isOK()”为false。
 
        if (addrList != null && !addrList.isEmpty()) {
                    for (int i = 0; i < addrList.size(); i++) {
                        int index = this .namesrvIndex .incrementAndGet();
                        index = Math. abs(index);
                        index = index % addrList.size();
                        String newAddr = addrList.get(index);
 
                        this .namesrvAddrChoosed.set(newAddr);
                        Channel channelNew = this .createChannel(newAddr);
                        if (channelNew != null)
                            return channelNew;
                    }
                }

 

分享到:
评论

相关推荐

    Java RocketMQ 路由注册与删除的实现

    其中,topicQueueTable存储了Topic消息队列路由信息,消息发送时根据路由表进行负载均衡;brokerAddrTable存储了Broker基础信息,包含brokerName、所属集群名称、主备Broker地址;clusterAddrTable存储了Broker集群...

    rocketmq安装包及RocketMQ 控制台JAR包

    3. **配置与连接**:在控制台上,你需要配置RocketMQ集群的NameServer地址,然后可以查看和管理你的主题(Topic)、队列(Queue)等资源,监控消费状态,进行消息查询等操作。 在实际应用中,RocketMQ支持多种部署...

    SpringBoot整合RocketMq,rocketMq

    5. **NameServer**:路由注册中心,负责保存Topic和Queue的映射关系,供Producer和Consumer查找。 6. **Broker**:RocketMQ服务器,存储消息并提供服务。 **四、高级特性** 1. **发布/订阅模型**:支持点对点模型...

    rocketMQ常用命令

    获取topic路由信息的命令是: ``` sh mqadmin topicRoute -n ***.***.**.***:9876 -t frist ``` 通过`topicRoute`子命令,可以获取到消息从发送到接收过程中的路由信息,这对于消息的追踪和调试非常有用。 9. ...

    rocketMQ-master.zip

    5. Message Queue:消息队列是RocketMQ中的基本概念,每个Topic可以包含多个Message Queue,消息会被均匀分配到不同的Message Queue中。 6. Topic:主题是消息的分类,Producer发送的消息需要指定一个Topic,...

    rocketmq_namesrv.zip

    Namesrv(Name Server)是RocketMQ的重要组成部分,它扮演着消息路由中心的角色,负责存储和管理Topic路由信息,使得Producer和Consumer可以高效地找到彼此进行通信。 在RocketMQ Namesrv的设计中,主要有以下几个...

    尚硅谷完整的关于rocketmq的学习视频整理笔记

    - **NameServer**:RocketMQ 的路由注册中心,负责维护主题与 Broker(消息服务器)之间的映射关系。 - **Broker**:实际存储消息的服务器,负责接收生产者发送的消息,并将消息分发给消费者。 - **Group ID**:标识...

    rocketmq安装包,rocketmq-all-5.1.3-bin-release

    NameServer是服务发现和路由管理的角色,Broker负责存储和转发消息,Producer是消息的生产者,而Consumer则是消息的消费者。 2. **NameServer**:NameServer是一个轻量级的服务,不保存任何业务数据,只负责维护...

    rocketmq-console后台管理jar包

    NameServer是RocketMQ中的一个关键组件,负责存储所有Topic和Broker的元数据,客户端通过NameServer获取到消息的路由信息。 3. **启动与运行** 解压压缩包后,你可以通过Java的JAR命令来运行RocketMQ Console。...

    rocketmq中间件

    - NameServer是RocketMQ的核心组件,负责路由信息的管理,不持久化数据,提供服务发现和路由查询功能。 4. **Producer**: - 生产者是发送消息的客户端,它需要向NameServer注册并获取Topic的路由信息,然后将...

    rocketmq.zip

    1. NameServer:路由中心,负责维护Topic与Broker的映射关系,提供Topic路由信息查询。 2. Broker:消息服务器,存储和转发消息。 3. Producer:消息生产者,负责发布消息到RocketMQ系统。 4. Consumer:消息消费者...

    rocketmq文件包

    在RocketMQ中,NameServer是一个轻量级的服务注册与发现组件,它负责维护Producer、Consumer以及Topic的路由信息。Producer在发送消息前需要先向NameServer注册,获取Consumer和Topic的相关信息。Consumer通过Pull或...

    rocketmq-4.7.0.zip

    5. **NameServer**:NameServer是服务注册与发现的组件,它不存储任何持久化数据,负责维护Broker和Topic的路由信息。Producer和Consumer通过NameServer找到目标队列,进行消息的发送和接收。 6. **Broker**:...

    RocketMQ 3.5.8 已编译版本

    1. **NameServer**:NameServer是RocketMQ的一个轻量级注册中心,它并不存储消息,而是存储Topic路由信息。Producer和Consumer在启动时会向NameServer注册,并在发送消息或消费消息时查询Topic的路由信息,以便找到...

    全面解剖RocketMQ和项目实战-day4-part2.7z

    NameServer是RocketMQ的核心组件之一,负责存储和管理Topic路由信息。启动步骤通常包括初始化配置、启动监听线程、加载本地持久化的路由信息等。了解NameServer的启动过程对维护和监控RocketMQ集群至关重要。 5. *...

    消息中间件rocketmq原理解析.pdf

    如果producer本地没有这个topic的路由信息,则会向nameserver查询这个topic的路由信息(TopicPublishInfo),并将其缓存到本地。 - producer会定时从nameserver更新topic的路由信息,并向broker发送心跳信息,确保...

    RocketMQ控制台

    2. **Topic管理**:创建、删除、修改Topic配置,如分区数量、队列数量等,同时可以查看Topic的路由信息和消费进度。 3. **Producer与Consumer管理**:管理消息的生产者(Producer)和消费者(Consumer),包括注册...

    RocketMQ技术讲解V2.0

    2、讲解NameServer的启动、注册Broker、客户端查询Topic的路由信息等功能; 3、讲解Broker的启动、注册、处理Producer发送消息、处理Consumer拉取消息、事务消息的处理等功能; 4、讲解Producer端的启动、发送普通...

    alibaba-rocketmq3.5.8 tar包

    1. NameServer:NameServer是一个轻量级的服务注册与发现组件,它并不存储数据,而是负责维护Topic路由信息,Producer和Consumer通过NameServer获取到Topic的路由信息,从而进行消息发送和消费。 2. Broker:Broker...

Global site tag (gtag.js) - Google Analytics