`
wbj0110
  • 浏览: 1587708 次
  • 性别: Icon_minigender_1
  • 来自: 上海
文章分类
社区版块
存档分类
最新评论

MetaQ技术内幕——源码分析(五)

阅读更多

Broker接收从Producer(Client端)发送的消息,也能够返回消息到Consumer(Client),对于Broker来说,就是网络输入输出流的处理。

Broker使用淘宝内部的gecko框架作为网络传输框架,gecko是一个NIO框架,能够支持一下特性:

1、 可自定义协议,协议可扩展、紧凑、高效

2、 可自动管理重连,重连由客户端发起

3、 需进行心跳检测,及时发现连接失效

4、 请求应答模型应当支持同步和异步 

5、 连接的分组管理,并且在重连情况下能正确处理连接的分组

6、 请求的发送应当支持四种模型:  (1) 向单个连接发起请求  (2) 向分组内的某个连接发起请求,这个选择策略可定义 (3) 向分组内的所有连接发起请求  (4) 向多个分组发起请求,每个分组的请求遵循(2) 

7、 编程模型上尽量做到简单、易用、透明,向上层代码屏蔽网络处理的复杂细节。

8、 高度可配置,包括网络参数、服务层参数等 

9、 高度可靠,正确处理网络异常,对内存泄露等隐患有预防措施

10、 可扩展

如果时间允许的话,笔者也可以做一下 gecko的源码分析

由于网络模块与其他模块关联性极强,不像存储模块可以独立分析,所以此篇文章开始将从全局开始分析Broker。

先看看Broker的启动类MetamorphosisStartup:

public static void main(final String[] args) {
final String configFilePath = getConfigFilePath(args);
final MetaConfig metaConfig = getMetaConfig(configFilePath);
final MetaMorphosisBroker server = new MetaMorphosisBroker(metaConfig);
server.start();
}

从MetamorphosisStartup可以看出其逻辑是先加载了配置文件,然后构造了MetaMorphosisBroker实例,并调用该实例的start方法,MetaMorphosisBroker才是Broker真正的启动类。

看看真正的启动类MetaMorphosisBroker, MetaMorphosisBroker实现接口MetaMorphosisBrokerMBean,可以通过 jmx 协议关闭MetaMorphosisBroker。看看在构造MetaMorphosisBroker实例的时候干了些什么事情。

public MetaMorphosisBroker(final MetaConfig metaConfig) {
        //配置信息
this.metaConfig = metaConfig;
        //Broker对外提供的nio Server
this.remotingServer = newRemotingServer(metaConfig);
       //线程池管理器,主要是提供给nio Server在并发环境下可以使用多线程处理,提高性能
        this.executorsManager = new ExecutorsManager(metaConfig);
       //全局唯一的id生成器		
        this.idWorker = new IdWorker(metaConfig.getBrokerId());
       //存储模块管理器
        this.storeManager = new MessageStoreManager(metaConfig, this.newDeletePolicy(metaConfig));
       //统计模块管理器
this.statsManager = new StatsManager(this.metaConfig, this.storeManager, this.remotingServer);
      //zookeeper客户端,前面介绍过metaq使用zookeeper作为中间协调者,Broker会将自己注册到zookeeper上,也会从zookeeper查询相关数据
this.brokerZooKeeper = new BrokerZooKeeper(metaConfig);
      //网络输入输出流处理器		
        final BrokerCommandProcessor next = new BrokerCommandProcessor(this.storeManager, this.executorsManager,          this.statsManager, this.remotingServer, metaConfig, this.idWorker, this.brokerZooKeeper);
     //事务存储引擎
JournalTransactionStore transactionStore = null;
try {
transactionStore = new JournalTransactionStore(metaConfig.getDataLogPath(), this.storeManager, metaConfig);
} catch (final Exception e) {
throw new MetamorphosisServerStartupException("Initializing transaction store failed.", e);
}
      //带事务处理的网络输入输出流处理器,设计采用了责任链的设计模式,使用事务存储引擎存储中间结果
this.brokerProcessor = new TransactionalCommandProcessor(metaConfig, this.storeManager, this.idWorker, next, transactionStore, this.statsManager);
      //钩子,JVM退出钩子,钩子实现在JVM退出的时候尽力正确关闭	MetaMorphosisBroker	   
        this.shutdownHook = new ShutdownHook();
      //注册钩子
Runtime.getRuntime().addShutdownHook(this.shutdownHook);
     //注册MBean,因为MetaMorphosisBroker实现MetaMorphosisBrokerMBean接口,可以将自己作为MBean注册到MBeanServer
MetaMBeanServer.registMBean(this, null);
}

前面我们知道在启动的时候会调用MetaMorphosisBroker的start() 方法,来看看start()方法里究竟做了些什么事情

public synchronized void start() {
//判断是否已经启动,如果已经启动,则不在启动
if (!this.shutdown) {
return;
}
this.shutdown = false;
//初始化存储模块,加载验证已有数据
this.storeManager.init();
//初始化线程池
this.executorsManager.init();
//初始化统计模块
this.statsManager.init();
//向nio server注册处理器
this.registerProcessors();
try {
//NIO server启动 
this.remotingServer.start();
} catch (final NotifyRemotingException e) {
throw new MetamorphosisServerStartupException("start remoting server failed", e);
}
try {
//在/brokers/ids下创建临时节点,名称为节点Id
this.brokerZooKeeper.registerBrokerInZk();
//如果为master节点,则创建/brokers/ids/master_config_checksum节点		
                this.brokerZooKeeper.registerMasterConfigFileChecksumInZk();
//添加主题列表监听器,监听主题列表变化,如果主题列表发生变化,则向zookeeper重新注册主题和分区信息			
               this.addTopicsChangeListener();
//注册主题和分区信息
        this.registerTopicsInZk();
//设置标志位主题和分区注册成功
this.registerZkSuccess = true;
} catch (final Exception e) {
this.registerZkSuccess = false;
throw new MetamorphosisServerStartupException("Register broker to zk failed", e);
}
log.info("Starting metamorphosis server...");
//初始化输入输出流处理器
this.brokerProcessor.init();
log.info("Start metamorphosis server successfully");
}

下面,让我们具体来看看start()方法里调用的MetaMorphosisBroker每一个方法,首先是registerProcessors()方法:

private void registerProcessors() {
//注册Get命令处理器
this.remotingServer.registerProcessor(GetCommand.class, new GetProcessor(this.brokerProcessor, this.executorsManager.getGetExecutor()));
//注册Put命令的处理器		
this.remotingServer.registerProcessor(PutCommand.class, new PutProcessor(this.brokerProcessor, this.executorsManager.getUnOrderedPutExecutor()));
//查询最近有效的offset处理器
this.remotingServer.registerProcessor(OffsetCommand.class, new OffsetProcessor(this.brokerProcessor, this.executorsManager.getGetExecutor()));
//心跳检测处理器
this.remotingServer.registerProcessor(HeartBeatRequestCommand.class, new VersionProcessor(this.brokerProcessor));
//注册退出命令处理器
this.remotingServer.registerProcessor(QuitCommand.class, new QuitProcessor(this.brokerProcessor));
//注册统计信息查询处理器
this.remotingServer.registerProcessor(StatsCommand.class, new StatsProcessor(this.brokerProcessor));
//注册事务命令处理器
this.remotingServer.registerProcessor(TransactionCommand.class, new TransactionProcessor(this.brokerProcessor, this.executorsManager.getUnOrderedPutExecutor()));
}

依赖于不同的处理器,可以将不同的请求进行处理并返回结果。接下来就是addTopicsChangeListener()方法。

//addTopicsChangeListener方法比较简单,主要简单配置的topic列表的变化,前面介绍过MetaConfig提供监听机制监听topic列表的变化,该方法向MetaConfig注册一个匿名监听器监听topic列表变化,一旦发生变化则向zookeeper进行注册
private void addTopicsChangeListener() {
// 监听topics列表变化并注册到zk
this.metaConfig.addPropertyChangeListener("topics", new PropertyChangeListener() {
public void propertyChange(final PropertyChangeEvent evt) {
try {
MetaMorphosisBroker.this.registerTopicsInZk();
} catch (final Exception e) {
log.error("Register topic in zk failed", e);
}
}
});
}

MetaMorphosisBroker在启动过程中被调用的方法还有registerTopicsInZk()方法,registerTopicsInZk完成向zookeeper注册topic和分区信息功能。在分析方法之前,有必要插入分析一下Broker在zk上注册的结构,代码在common工程的类MetaZookeeper,该结构是Broker和Client共享的。 

Zk中有4中类型的根目录,分别是:

1) /consumers:存放消费者列表以及消费记录,消费者列表主要是以组的方式存在,结构主要如下:

       /consumers/xxGroup/ids/xxConsumerId:DATA (“:”后的DATA表示节点xxConsumerId对应的数据) 组内消费者Id;DATA为订阅主题列表,以”,”分隔

       /consumers/xxGroup/offsets/xxTopic/分区N:DATA  组内主题分区N的消费进度;DATA为topic下分区N具体进度值

       /consumers/xxGroup/owners/xxTopic/分区N:DATA  组内主题分区N的的消费者 ;DATA为消费者ID,表示XXTopic下分区N的数据由指定的消费者进行消费

2) /brokers/ids:存放Broker列表,如果Broker与Zookeeper失去连接,则会自动注销在/brokers/ids下的broker记录,例子如下:

    /brokers/ids/xxBroker

3) /brokers/topics-pub:存放发布的主题列表以及对应的可发送消息的Broker列表,例子如下:

    /brokers/topics-pub/xxTopic/xxBroker

    /brokers/topics-pub下记录的是可发送消息到xxTopic的Broker列表,意味着有多少个Broker允许存储Client发送到Topic数据

4) /brokers/topics-sub:存放订阅的主题列表以及对应可订阅的Broker列表,例子如下:

    /brokers/topics-sub/xxTopic/xxBroker

    /brokers/topics-sub下记录的可订阅xxTopic的Broker列表,意味着有多少个Broker允许被Client订阅topic的数据

具体代码如下:

public MetaZookeeper(final ZkClient zkClient, final String root) {
//zk客户端
this.zkClient = zkClient;
//根路径,默认为空
this.metaRoot = this.normalize(root);
//前面讲的消费者列表
this.consumersPath = this.metaRoot + "/consumers";
//前面讲的brokers列表
this.brokerIdsPath = this.metaRoot + "/brokers/ids";
//前面讲的/brokers/topics-pub
this.brokerTopicsPubPath = this.metaRoot + "/brokers/topics-pub";
//前面讲的/brokers/topics-sub
this.brokerTopicsSubPath = this.metaRoot + "/brokers/topics-sub";
}

 至于更复杂的,我们将在后面具体再进行分析,主要先了解该存储结构即可。

回归正题 , registerTopicsInZk 方法完成向 zookeeper 注册 topic 和分区信息功能

private void registerTopicsInZk() throws Exception {
// 先注册配置的topic到zookeeper
for (final String topic : this.metaConfig.getTopics()) {
this.brokerZooKeeper.registerTopicInZk(topic, true);
}
// 注册加载的topic到zookeeper
        // 从下面代码可以看出,如果当前没有配置的topic,但前面配置过的topic如果有消息存在,依然会向zk注册,在某种程度,我认为这个设计不好,为什么?
答:我们前面分析过MessageStoreManager类,里面有getMessageStore()方法和getOrCreateMessageStore()方法,在调用getMessageStore()方法时没有检查参数topic是否在topicsPatSet列表中(topicsPatSet只包含了配置的topic),而getOrCreateMessageStore()方法却检查了,这就意味着使用getOrCreateMessageStore()方法时,如果要查询获取不在topicsPatSet列表中的MessageStore实例会抛出异常,而调用getMessageStore()不会,让人产生疑惑。个人见解认为一旦配置发生更改,如果要做热加载的话则先卸载再重新加载会更合适,而且在getOrCreateMessageStore()和getMessageStore()方法都使用topicsPatSet进行判断,保持一致性
for (final String topic : this.storeManager.getMessageStores().keySet()) {
this.brokerZooKeeper.registerTopicInZk(topic, true);
}
}

MetaMorphosisBroker 还有两个方法,一个是 newDeletePolicy() 方法,另一个是stop() 方法。 newDeletePolicy() 用于生产全局的存储模块的删除策略,如果没有配置删除策略,则使用该策略。

//全局删除策略
private DeletePolicy newDeletePolicy(final MetaConfig metaConfig) {
final String deletePolicy = metaConfig.getDeletePolicy();
if (deletePolicy != null) {
return DeletePolicyFactory.getDeletePolicy(deletePolicy);
}
return null;
}

而 stop() 方法则主要在 MetaMorphosisBroker 关闭的时候销毁资源,尽力保证MetaQ 的正确关闭。

public synchronized void stop() {
//如果关闭了,则不再关闭
if (this.shutdown) {
return;
}
log.info("Stopping metamorphosis server...");
this.shutdown = true;
//关闭与zk连接,注销与当前节点相关的配置
this.brokerZooKeeper.close(this.registerZkSuccess);
try {
// Waiting for zookeeper to notify clients.
Thread.sleep(this.brokerZooKeeper.getZkConfig().zkSyncTimeMs);
} catch (InterruptedException e) {
// ignore
}
//释放线程池
this.executorsManager.dispose();
//释放存储模块
this.storeManager.dispose();
//释放统计模块
this.statsManager.dispose();
//关闭NIO Server
try {
this.remotingServer.stop();
} catch (final NotifyRemotingException e) {
log.error("Shutdown remoting server failed", e);
}
//释放输入输出流处理器
this.brokerProcessor.dispose();
//如果是独立的zk,则关闭zk
EmbedZookeeperServer.getInstance().stop();
//释放钩子	
if (!this.runShutdownHook && this.shutdownHook != null) {
Runtime.getRuntime().removeShutdownHook(this.shutdownHook);
}
log.info("Stop metamorphosis server successfully");
}

今天, Broker 的分析先到这,前面介绍的 zk 中注册的结构是与 Client 相关的,这里也向大家介绍了一下,以后分析 Client 的时候,该结构将不再介绍了。

分享到:
评论

相关推荐

    metamorphosis(metaq)

    《Metamorphosis (MetaQ) 服务端1.4.3版本详解及客户端使用》 Metamorphosis,简称MetaQ,是一款高效、稳定、可扩展的消息队列系统,由阿里巴巴开发并开源,主要用于解决分布式环境下的异步处理、解耦以及数据传输...

    Metaq原理与应用

    Metaq 是一种高性能、高可用的消息中间件,其设计灵感来源于 Kafka,但并不严格遵循任何特定的规范,如 JMS(Java Message Service)或 CORBA Notification 规范。Metaq 提供了丰富的特性来解决 Messaging System 中...

    metaQ向spark传数据

    在大数据处理领域,MetaQ和Spark是两个非常关键的组件。MetaQ是腾讯开源的一款分布式消息中间件,常用于实时数据处理系统中的消息传递。而Spark则是一个强大的、通用的并行计算框架,专为大数据分析设计,尤其擅长...

    metaq-server-1.4.6.2.tar.gz

    1. 日志收集:MetaQ可用于收集分布在各服务器上的日志,统一管理和分析,提高运维效率。 2. 数据同步:在分布式数据库或缓存系统中,MetaQ可以作为数据变更的传播通道,保证数据的一致性。 3. 异步处理:对于耗时...

    metaq-server-1.4.6.2客户端+服务端

    MetaQ是阿里巴巴开源的一款分布式消息中间件,它主要用于在大规模分布式系统中提供高效、可靠的消息传递服务。MetaQ Server 1.4.6.2版本是这个中间件的一个特定发行版,包含了服务端和客户端的组件,以及相关的...

    Metaq在JDk 7下的异常及解决方案

    本文将深入解析这一异常的具体情况,分析其原因,并提出相应的解决方案。 异常现象主要表现为:在尝试清理内存映射文件时,由于Java反射机制调用了`java.nio.DirectByteBuffer`类中的`viewedBuffer()`方法,导致`...

    metaq-server-1.4.6.2.zip 和原版一样就是换了个名字

    最后,MetaQ还提供了丰富的监控和管理工具,包括日志分析、性能指标监控和Web管理界面,方便运维人员进行故障排查和系统调优。 综上所述,MetaQ服务器1.4.6.2版本在保持原有功能的基础上,可能针对性能、稳定性和...

    Metaq详细手册.docx

    Metaq,源自LinkedIn的开源消息中间件Kafka的Java实现——Memorphosis,针对淘宝内部的应用需求进行了定制和优化。它遵循一系列设计原则,旨在提供高效、可靠且灵活的消息传递服务。 1. **消息持久化**:Metaq保证...

    RocketMQ最全介绍与实战.pdf

    RocketMQ 的前世今生是 Metaq,Metaq 在阿里巴巴集团内部、蚂蚁金服、菜鸟等各业务中被广泛使用,接入了上万个应用系统中。 RocketMQ 的使用场景包括应用解耦、流量削峰等。应用解耦系统的耦合性越高,容错性就越低...

    metaQ的安装包

    MetaQ,全称为“Meta Message Queue”,是阿里巴巴开源的一款分布式消息中间件,主要用于解决大规模分布式系统中的消息传递问题。MetaQ 提供了高可用、高可靠的消息服务,支持多种消息模型,如点对点(Point-to-...

    metaq消息中间件服务端、客户端资源汇集

    Metamorphosis是淘宝开源的一个Java消息中间件,他类似apache-kafka,但不是一个简单的山寨拷贝,而是做了很多改进和优化,项目的主页在淘蝌蚪上。服务端、客户端、javadoc都包含在内。

    MetaQ 分布式消息服务中间件.pdf

    MetaQ是一款分布式消息服务中间件,其核心功能基于发布-订阅模型。在这一模型中,发布者(Producer)将消息发布到MetaQ,MetaQ会储存这些消息,而订阅者(Consumer)则通过pull方式来消费这些消息。具体而言,消费者...

    万亿级数据洪峰下的消息引擎——Apache RocketMQ--阿里.pdf

    ### 万亿级数据洪峰下的消息引擎——Apache RocketMQ #### 阿里消息中间件的演变历史 自2007年起,阿里巴巴集团在消息中间件领域不断探索与实践,经历了从Notify到MetaQ再到Apache RocketMQ的发展历程。以下是这一...

    支付宝钱包系统架构内部剖析(架构图)

    - **事务支持**:MetaQ支持两种类型的事务——本地事务和XA分布式事务。这两种事务类型能够满足支付宝钱包系统在处理复杂金融交易时对数据一致性的需求。 - **高可用复制**:MetaQ提供了异步复制和同步复制两种模式...

    实时数仓2.0——打怪升级之路.pdf

    综上所述,实时数仓2.0是一种先进的数据处理框架,它通过优化数据模型、提升处理速度、确保数据质量,以及利用高级状态管理技术,来满足企业对实时数据分析的高要求。这一解决方案为企业提供了更敏捷的业务洞察,...

    阿里巴巴企业诚信体系——从大数据到场景应用.pdf

    阿里巴巴企业诚信体系是基于大数据和先进技术构建的一套全面的安全架构,旨在从多个维度评估和管理企业信用风险。这个体系不仅涵盖了安全威胁情报、安全建设、应急响应和法律法规等多个关键领域,还利用自动化手段...

    开源技术大会2014-CSDN-蒋涛《关于开源的思考》

    在开源的推动下,许多技术都得到了长足的发展,如LVS(Linux Virtual Server)、Tengine、MetaQ、dubbo、cobar、Fastjson等。这些技术的成功案例表明,开源不仅是技术的共享,也是知识和创新的共享。 蒋涛的讲话...

    大数据技术22.pptx

    【大数据技术概述】 大数据,作为一个现代信息技术的关键领域,是指那些数据量庞大、增长迅速、...随着技术的不断进步,大数据将继续在数据分析、人工智能和机器学习等领域发挥关键作用,为企业和社会创造更大的价值。

Global site tag (gtag.js) - Google Analytics