`
wbj0110
  • 浏览: 1610363 次
  • 性别: Icon_minigender_1
  • 来自: 上海
文章分类
社区版块
存档分类
最新评论

MetaQ技术内幕——源码分析(六)

阅读更多

前面介绍过MetaQ使用gecko框架作为网络传输框架,Gecko采用请求/响应的方式组织传输。MetaQ依据定义了请求和响应的命令,由于命令Client和Broker均需要使用,所以放在了common工程的类MetaEncodeCommand中:

Java代码 复制代码 收藏代码
  1. public String GET_CMD = "get";  //请求数据请求 
  2. public String RESULT_CMD = "result"; //结果响应(不包括消息) 
  3. public String OFFSET_CMD = "offset"; //查询最近有效的offset请求 
  4. public String PUT_CMD  = "put";  //发送消息命令请求 
  5. public String SYNC_CMD = "sync"; //同步数据请求 
  6. public String QUIT_CMD = "quit";  //退出请求,客户端发送此命令后,服务器将主动关闭连接 
  7. public String VERSION_CMD = "version"; //查询服务器版本请求,也用于心跳检测 
  8. public String STATS_CMD = "stats"; //查询统计信息请求 
  9. public String TRANS_CMD = "transaction"; //事务请求 
  10. //还有一个响应,这里没有响应头的定义,就是返回的消息集 
    public String GET_CMD = "get";  //请求数据请求    public String RESULT_CMD = "result"; //结果响应(不包括消息)    public String OFFSET_CMD = "offset"; //查询最近有效的offset请求    public String PUT_CMD  = "put";  //发送消息命令请求    public String SYNC_CMD = "sync"; //同步数据请求    public String QUIT_CMD = "quit";  //退出请求,客户端发送此命令后,服务器将主动关闭连接    public String VERSION_CMD = "version"; //查询服务器版本请求,也用于心跳检测    public String STATS_CMD = "stats"; //查询统计信息请求    public String TRANS_CMD = "transaction"; //事务请求    //还有一个响应,这里没有响应头的定义,就是返回的消息集

在分析Broker启动类MetaMorphosisBroker时,分析过registerProcessors()方法,针对于不同的请求,Broker注册了不同的处理器,详情见registerProcessors()方法。MetaQ传输采用文本协议设计,非常透明,MetaEncodeCommand定义是请求类型。

Broker分为请求和响应的命令,先看看请求的类图:


注:由于类图工具出现问题,AbstractRequestCommand跟RequestCommand的实现关系并画成依赖关系,请大家理解成实现关系,以后类图同样这样理解(除非接口与接口或者类与类关系使用依赖箭头一定是描述成依赖的)。

所有的请求的命令均继承AbstractRequestCommand类并实现RequestCommand接口,RequestCommand是Gecko框架定义的接口,所有的命令均在编码后能被Gecko框架组织传输,传输协议是透明的文本协议。AbstractRequestCommand定义了基本属性topic和opaque。

Java代码 复制代码 收藏代码
  1. public abstract class AbstractRequestCommand implements RequestCommand, MetaEncodeCommand { 
  2.     private Integer opaque; //主要用于标识请求,响应的时候该标识被带回,用于客户端区分是哪个请求的响应 
  3.     private String topic;  
public abstract class AbstractRequestCommand implements RequestCommand, MetaEncodeCommand {	private Integer opaque; //主要用于标识请求,响应的时候该标识被带回,用于客户端区分是哪个请求的响应	private String topic; }

响应命令的类图如下:


请求命令只分为两类,带有消息集合的响应(DataCommand);带有其他结果的响应(BooleanCommand)。DataCommand里携带的消息的格式与消息的存储结构一直,这样可以提高Broker的处理能力,将消息解析、正确性等验证放在Client,充分发挥Client的计算能力。这里比较麻烦的一点就是其他结果的响应均有BooleanCommand完成,BooleanCommand中只有code和message熟悉,code用来返回响应状态码,比如统计结果的信息的携带就必须由message熟悉来完成,所以结果的响应能力有限,而且必须先转换成字符串,具体如下:

Java代码 复制代码 收藏代码
  1. /**
  2. * 应答命令,协议格式如下:</br> result code length opaque\r\n message
  3. */ 
  4. public class BooleanCommand extends AbstractResponseCommand implements BooleanAckCommand { 
  5.     private String message; 
  6.     /**
  7.      * status code in http protocol
  8.      */ 
  9.     private final int code;//响应的状态码 
  10.     public BooleanCommand(final int code, final String message, final Integer opaque) { 
  11.         super(opaque); 
  12.         this.code = code; 
  13.         switch (this.code) { 
  14.             case HttpStatus.Success: 
  15.                 this.setResponseStatus(ResponseStatus.NO_ERROR); 
  16.                 break; 
  17.             default: 
  18.                 this.setResponseStatus(ResponseStatus.ERROR); 
  19.                 break; 
  20.         } 
  21.         this.message = message; 
  22.     } 
  23.  
  24.     public String getErrorMsg() { 
  25.         return this.message; 
  26.     } 
  27.  
  28.     public int getCode() { 
  29.         return this.code; 
  30.     } 
  31.  
  32.     public void setErrorMsg(final String errorMsg) { 
  33.         this.message = errorMsg; 
  34.     } 
  35.  
  36.     public IoBuffer encode() { 
  37. //对结果进行编码,以便能在网络上传输 
  38.         final byte[] bytes = ByteUtils.getBytes(this.message); 
  39.         final int messageLen = bytes == null ? 0 : bytes.length; 
  40.         final IoBuffer buffer = IoBuffer.allocate(11 + ByteUtils.stringSize(this.code) 
  41.                 + ByteUtils.stringSize(this.getOpaque()) + ByteUtils.stringSize(messageLen) + messageLen); 
  42.         ByteUtils.setArguments(buffer, MetaEncodeCommand.RESULT_CMD, this.code, messageLen, this.getOpaque()); 
  43.         if (bytes != null) { 
  44.             buffer.put(bytes); 
  45.         } 
  46.         buffer.flip(); 
  47.         return buffer; 
  48.     } 
/** * 应答命令,协议格式如下:</br> result code length opaque\r\n message */public class BooleanCommand extends AbstractResponseCommand implements BooleanAckCommand {	private String message;	/**	 * status code in http protocol	 */	private final int code;//响应的状态码	public BooleanCommand(final int code, final String message, final Integer opaque) {		super(opaque);		this.code = code;		switch (this.code) {			case HttpStatus.Success:				this.setResponseStatus(ResponseStatus.NO_ERROR);				break;			default:				this.setResponseStatus(ResponseStatus.ERROR);				break;		}		this.message = message;	}	public String getErrorMsg() {		return this.message;	}	public int getCode() {		return this.code;	}	public void setErrorMsg(final String errorMsg) {		this.message = errorMsg;	}	public IoBuffer encode() {//对结果进行编码,以便能在网络上传输		final byte[] bytes = ByteUtils.getBytes(this.message);		final int messageLen = bytes == null ? 0 : bytes.length;		final IoBuffer buffer = IoBuffer.allocate(11 + ByteUtils.stringSize(this.code)				+ ByteUtils.stringSize(this.getOpaque()) + ByteUtils.stringSize(messageLen) + messageLen);		ByteUtils.setArguments(buffer, MetaEncodeCommand.RESULT_CMD, this.code, messageLen, this.getOpaque());		if (bytes != null) {			buffer.put(bytes);		}		buffer.flip();		return buffer;	}}

前面讲到的每个请求都会携带一个属性opaque并且该opaque将会在响应里被带回, AbstractResponseCommand里定义了该被带回的属性opaque。

Java代码 复制代码 收藏代码
  1. /**
  2. * 应答命令基类
  3. */ 
  4. public abstract class AbstractResponseCommand implements ResponseCommand, MetaEncodeCommand { 
  5.     private Integer opaque;  
  6.     private InetSocketAddress responseHost; // responseHost和responseTime尚未发现在哪来调用,预计是作者预留的属性 
  7.     private long responseTime; 
  8.     private ResponseStatus responseStatus; //响应状态 
/** * 应答命令基类 */public abstract class AbstractResponseCommand implements ResponseCommand, MetaEncodeCommand {	private Integer opaque; 	private InetSocketAddress responseHost; // responseHost和responseTime尚未发现在哪来调用,预计是作者预留的属性	private long responseTime;	private ResponseStatus responseStatus; //响应状态}

不知道研究过MetaQ源码的朋友们发现DataCommand的encode()是一个空实现没,虽然作者有注释,运行Broker确实不存在问题,但就从设计者的角度来看,还是有些小问题的,代码如下:

Java代码 复制代码 收藏代码
  1. public class DataCommand extends AbstractResponseCommand { 
  2.     private final byte[] data; 
  3. …… 
  4.     @Override 
  5.     public IoBuffer encode() { 
  6.     //作者注释: 不做任何事情,发送data command由transferTo替代 
  7.        //笔者注释:因为Borker采用了BrokerCommandProcessor 中zeroCopy的机制,所以不会该encode方法的调用,但如果以后改动后,允许zeroCopy变成可配置项,如果该方法不实现,就会出现解析问题,因为配置容易加上,但容易忘记该处的实现。 
  8.         return null; 
  9.     } 
public class DataCommand extends AbstractResponseCommand {	private final byte[] data;……	@Override	public IoBuffer encode() {	//作者注释: 不做任何事情,发送data command由transferTo替代       //笔者注释:因为Borker采用了BrokerCommandProcessor 中zeroCopy的机制,所以不会该encode方法的调用,但如果以后改动后,允许zeroCopy变成可配置项,如果该方法不实现,就会出现解析问题,因为配置容易加上,但容易忘记该处的实现。		return null;	}}
分享到:
评论

相关推荐

    metamorphosis(metaq)

    《Metamorphosis (MetaQ) 服务端1.4.3版本详解及客户端使用》 Metamorphosis,简称MetaQ,是一款高效、稳定、可扩展的消息队列系统,由阿里巴巴开发并开源,主要用于解决分布式环境下的异步处理、解耦以及数据传输...

    Metaq原理与应用

    Metaq 是一种高性能、高可用的消息中间件,其设计灵感来源于 Kafka,但并不严格遵循任何特定的规范,如 JMS(Java Message Service)或 CORBA Notification 规范。Metaq 提供了丰富的特性来解决 Messaging System 中...

    metaQ向spark传数据

    在大数据处理领域,MetaQ和Spark是两个非常关键的组件。MetaQ是腾讯开源的一款分布式消息中间件,常用于实时数据处理系统中的消息传递。而Spark则是一个强大的、通用的并行计算框架,专为大数据分析设计,尤其擅长...

    metaq-server-1.4.6.2.tar.gz

    1. 日志收集:MetaQ可用于收集分布在各服务器上的日志,统一管理和分析,提高运维效率。 2. 数据同步:在分布式数据库或缓存系统中,MetaQ可以作为数据变更的传播通道,保证数据的一致性。 3. 异步处理:对于耗时...

    metaq-server-1.4.6.2客户端+服务端

    MetaQ是阿里巴巴开源的一款分布式消息中间件,它主要用于在大规模分布式系统中提供高效、可靠的消息传递服务。MetaQ Server 1.4.6.2版本是这个中间件的一个特定发行版,包含了服务端和客户端的组件,以及相关的...

    Metaq在JDk 7下的异常及解决方案

    本文将深入解析这一异常的具体情况,分析其原因,并提出相应的解决方案。 异常现象主要表现为:在尝试清理内存映射文件时,由于Java反射机制调用了`java.nio.DirectByteBuffer`类中的`viewedBuffer()`方法,导致`...

    metaq-server-1.4.6.2.zip 和原版一样就是换了个名字

    最后,MetaQ还提供了丰富的监控和管理工具,包括日志分析、性能指标监控和Web管理界面,方便运维人员进行故障排查和系统调优。 综上所述,MetaQ服务器1.4.6.2版本在保持原有功能的基础上,可能针对性能、稳定性和...

    Metaq详细手册.docx

    Metaq,源自LinkedIn的开源消息中间件Kafka的Java实现——Memorphosis,针对淘宝内部的应用需求进行了定制和优化。它遵循一系列设计原则,旨在提供高效、可靠且灵活的消息传递服务。 1. **消息持久化**:Metaq保证...

    RocketMQ最全介绍与实战.pdf

    RocketMQ 的前世今生是 Metaq,Metaq 在阿里巴巴集团内部、蚂蚁金服、菜鸟等各业务中被广泛使用,接入了上万个应用系统中。 RocketMQ 的使用场景包括应用解耦、流量削峰等。应用解耦系统的耦合性越高,容错性就越低...

    metaQ的安装包

    MetaQ,全称为“Meta Message Queue”,是阿里巴巴开源的一款分布式消息中间件,主要用于解决大规模分布式系统中的消息传递问题。MetaQ 提供了高可用、高可靠的消息服务,支持多种消息模型,如点对点(Point-to-...

    metaq消息中间件服务端、客户端资源汇集

    Metamorphosis是淘宝开源的一个Java消息中间件,他类似apache-kafka,但不是一个简单的山寨拷贝,而是做了很多改进和优化,项目的主页在淘蝌蚪上。服务端、客户端、javadoc都包含在内。

    MetaQ 分布式消息服务中间件.pdf

    MetaQ是一款分布式消息服务中间件,其核心功能基于发布-订阅模型。在这一模型中,发布者(Producer)将消息发布到MetaQ,MetaQ会储存这些消息,而订阅者(Consumer)则通过pull方式来消费这些消息。具体而言,消费者...

    阿里消息中间件MetaQ学习Demo.zip

    阿里消息中间件MetaQ学习Demo

    万亿级数据洪峰下的消息引擎——Apache RocketMQ--阿里.pdf

    ### 万亿级数据洪峰下的消息引擎——Apache RocketMQ #### 阿里消息中间件的演变历史 自2007年起,阿里巴巴集团在消息中间件领域不断探索与实践,经历了从Notify到MetaQ再到Apache RocketMQ的发展历程。以下是这一...

    支付宝钱包系统架构内部剖析(架构图)

    - **事务支持**:MetaQ支持两种类型的事务——本地事务和XA分布式事务。这两种事务类型能够满足支付宝钱包系统在处理复杂金融交易时对数据一致性的需求。 - **高可用复制**:MetaQ提供了异步复制和同步复制两种模式...

    实时数仓2.0——打怪升级之路.pdf

    综上所述,实时数仓2.0是一种先进的数据处理框架,它通过优化数据模型、提升处理速度、确保数据质量,以及利用高级状态管理技术,来满足企业对实时数据分析的高要求。这一解决方案为企业提供了更敏捷的业务洞察,...

    阿里巴巴企业诚信体系——从大数据到场景应用.pdf

    阿里巴巴企业诚信体系是基于大数据和先进技术构建的一套全面的安全架构,旨在从多个维度评估和管理企业信用风险。这个体系不仅涵盖了安全威胁情报、安全建设、应急响应和法律法规等多个关键领域,还利用自动化手段...

    开源技术大会2014-CSDN-蒋涛《关于开源的思考》

    在开源的推动下,许多技术都得到了长足的发展,如LVS(Linux Virtual Server)、Tengine、MetaQ、dubbo、cobar、Fastjson等。这些技术的成功案例表明,开源不仅是技术的共享,也是知识和创新的共享。 蒋涛的讲话...

Global site tag (gtag.js) - Google Analytics