`
javahacker2
  • 浏览: 43486 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

metaq源码分析(二)

 
阅读更多

消息,是MetaQ最重要的资源,在分析MetaQ之前必须了解的概念,我们所做的一切都是围绕消息进行的,让我们看看MetaQ中消息的定义是怎样的,MetaQ的类Message定义了消息的格式:

Java代码  收藏代码
  1. public class Message implements Serializable {  
  2.   private long id; //消息的ID  
  3.   private String topic; //消息属于哪个主题  
  4.   private byte[] data;  //消息的内容  
  5.   private String attribute; //消息的属性  
  6.   private int flag; //属性标志位,如果属性不为空,则该标志位true  
  7.   private Partition partition; //该主题下的哪个分区,简单的理解为发送到该主题下的哪个队列  
  8.   private transient boolean readOnly; //消息是否只读  
  9.   private transient boolean rollbackOnly = false//该消息是否需要回滚,主要用于事务实现的需要  
  10. }  

从对消息的定义,我们可以看出消息都有一个唯一的ID,并且归属于某个主题,发送到该主题下的某个分区,具有一些基本属性。

 

MetaQ分为Broker和Client以及Zookeeper,系统结构如下:

 

 

 

 

下面我们先分析Broker源码。

 

Broker主要围绕发送消息和消费消息的主线进行,对于Broker来说就是输入、输出流的处理。在该主线下,Broker主要分为如下模块:网络传输模块、消息存储模块、消息统计模块以及事务模块,本篇首先针对独立性较强的消息存储模块进行分析。

 

在进行存储模块分析之前,我们得了解Broker中的一个重要的类 MetaConfig,MetaConfig是Broker配置加载器,通过MetaConfig可以获取到各模块相关的配置,所以MetaConfig 是贯穿所有模块的类。MetaConfig实现MetaConfigMBean接口,该接口定义如下:

Java代码  收藏代码
  1. public interface MetaConfigMBean {  
  2.     /** 
  3.      * Reload topics configuration 
  4.      */  
  5.     public void reload();  
  6.    
  7.     /**关闭分区 */  
  8.     public void closePartitions(String topic, int start, int end);  
  9.    
  10.     /**打开一个topic的所有分区 */  
  11.     public void openPartitions(String topic);  
  12. }  

 

MetaConfig注册到了MBeanServer上,所以可以通过JMX协议重新加 载配置以及关闭和打开分区。为了加载的配置立即生效,MetaConfig内置了一个通知机制,可以通过向MetaConfig注册监听器的方式关注相关 配置的变化,监听器需实现PropertyChangeListener接口。

Java代码  收藏代码
  1. public void addPropertyChangeListener(final String propertyName, final PropertyChangeListener listener) {  
  2.       this.propertyChangeSupport.addPropertyChangeListener(propertyName, listener);  
  3. }  
  4.    
  5. public void removePropertyChangeListener(final PropertyChangeListener listener) {  
  6.       this.propertyChangeSupport.removePropertyChangeListener(listener);  
  7. }  

 

目前MetaConfig发出的事件通知有三种:配置文件发生变化(configFileChecksum)、主题发生变化(topics,topicConfigMap)以及刷新存储的频率发生变化(unflushInterval),代码如下:

Java代码  收藏代码
  1. //configFileChecksum通知  
  2. private Ini createIni(final File file) throws IOException, InvalidFileFormatException {  
  3.       ……  
  4.       this.propertyChangeSupport.firePropertyChange("configFileChecksum"nullnull);  
  5.       return conf;  
  6. }  
  7.    
  8. public void setConfigFileChecksum(long configFileChecksum) {  
  9.       this.configFileChecksum = configFileChecksum;  
  10.       this.propertyChangeSupport.firePropertyChange("configFileChecksum"nullnull);  
  11. }  
  12.    
  13. //topics、topicConfigMap和unflushInterval通知  
  14. private void populateTopicsConfig(final Ini conf) {  
  15. ……  
  16. if (!newTopicConfigMap.equals(this.topicConfigMap)) {  
  17.           this.topics = newTopics;  
  18.           this.topicConfigMap = newTopicConfigMap;  
  19.           this.propertyChangeSupport.firePropertyChange("topics"nullnull);  
  20.           this.propertyChangeSupport.firePropertyChange("topicConfigMap"nullnull);  
  21.   }  
  22.   this.propertyChangeSupport.firePropertyChange("unflushInterval"nullnull);  
  23. ……  

需要注意的是,调用reload方法时,只对topic的配置生效,对全局配置不生效,只重载topic的配置。

 

好吧,废话了许多,让我们正式进入存储模块的分析吧。

 

Broker的存储模块用于存储Client发送的等待被消费的消息,Broker采用文件存储的方式来存储消息,存储模块类图如下:

 



  

 

MessageSet代表一个消息集合,可能是一个文件也可能是文件的一部分,其定义如下:

 

Java代码  收藏代码
  1. /** 
  2.  * 消息集合 
  3.  */  
  4. public interface MessageSet {  
  5.   public MessageSet slice(long offset, long limit) throws IOException; //获取一个消息集合  
  6.    
  7.   public void write(GetCommand getCommand, SessionContext ctx);  
  8.    
  9.   public long append(ByteBuffer buff) throws IOException; //存储一个消息,这时候还没有存储到磁盘,需要调用flush方法才能保证存储到磁盘  
  10.    
  11.   public void flush() throws IOException; //提交到磁盘  
  12.    
  13.   public void read(final ByteBuffer bf, long offset) throws IOException; //读取消息  
  14.    
  15.   public void read(final ByteBuffer bf) throws IOException; //读取消息  
  16.    
  17.   public long getMessageCount();//该集合的消息数量  
  18. }  

FileMessageSet实现了MessageSet接口和Closeable接口,实现Closeable接口主要是为了在文件关闭的时候确保文件通道关闭以及内容是否提交到磁盘

 

Java代码  收藏代码
  1. public void close() throws IOException {  
  2.       if (!this.channel.isOpen()) {  
  3.           return;  
  4.       }  
  5.       //保证在文件关闭前,将内容提交到磁盘,而不是在缓存中  
  6.       if (this.mutable) {  
  7.           this.flush();  
  8.       }  
  9.       //关闭文件通道  
  10.       this.channel.close();  
  11.   }  

下面让我们来具体分析一下FileMessageSet这个类,

Java代码  收藏代码
  1. public class FileMessageSet implements MessageSet, Closeable {  
  2.     ……  
  3.   private final FileChannel channel; //对应的文件通道  
  4.   private final AtomicLong messageCount; //内容数量  
  5.   private final AtomicLong sizeInBytes;   
  6.   private final AtomicLong highWaterMark; // 已经确保写入磁盘的水位  
  7.   private final long offset; // 镜像offset  
  8.   private boolean mutable;   // 是否可变  
  9.    
  10.   public FileMessageSet(final FileChannel channel, final long offset, final long limit, final boolean mutable) throws IOException {  
  11.       this.channel = channel;  
  12.       this.offset = offset;  
  13.       this.messageCount = new AtomicLong(0);  
  14.       this.sizeInBytes = new AtomicLong(0);  
  15.       this.highWaterMark = new AtomicLong(0);  
  16.       this.mutable = mutable;  
  17.       if (mutable) {   
  18.           final long startMs = System.currentTimeMillis();  
  19.           final long truncated = this.recover();  
  20.           if (this.messageCount.get() > 0) {  
  21.               log.info("Recovery succeeded in " + (System.currentTimeMillis() - startMs) / 1000 + " seconds. " + truncated + " bytes truncated.");  
  22.           }  
  23.       } else {  
  24.           try {  
  25.               this.sizeInBytes.set(Math.min(channel.size(), limit) - offset);  
  26.               this.highWaterMark.set(this.sizeInBytes.get());  
  27.           } catch (final Exception e) {  
  28.               log.error("Set sizeInBytes error", e);  
  29.           }  
  30.       }  
  31.   }  
  32. // 注意FileMessageSet的mutable属性,如果mutable为true的时候,将会调用recover()方法,该方法主要是验证文件内 容的完整性,后面会详细介绍,如果mutable为false的时候,表明该文件不可更改,这个时候磁盘水位和sizeInBytes的值均为文件大小。  
  33.    
  34. ……  
  35. public long append(final ByteBuffer buf) throws IOException {  
  36.       //如果mutable属性为false的时候,不允许追加消息在文件尾  
  37.       if (!this.mutable) {  
  38.           throw new UnsupportedOperationException("Immutable message set");  
  39.       }  
  40.       final long offset = this.sizeInBytes.get();  
  41.       int sizeInBytes = 0;  
  42.       while (buf.hasRemaining()) {  
  43.           sizeInBytes += this.channel.write(buf);  
  44.       }  
  45.       //在这个时候并没有将内容写入磁盘,还在通道的缓存中,需要在适当的时候调用flush方法  
  46.        //这个时候磁盘的水位是不更新的,只有在保证写入磁盘才会更新磁盘的水位信息  
  47.       this.sizeInBytes.addAndGet(sizeInBytes); 、  
  48.       this.messageCount.incrementAndGet();  
  49.       return offset;  
  50.   }  
  51.    
  52.    //提交到磁盘  
  53.   public void flush() throws IOException {  
  54.       this.channel.force(true);  
  55.       this.highWaterMark.set(this.sizeInBytes.get());  
  56.   }  
  57. ……  
  58. @Override  
  59.   public MessageSet slice(final long offset, final long limit) throws IOException {  
  60.       //返回消息集  
  61. return new FileMessageSet(this.channel, offset, limit, false);  
  62.   }  
  63.    
  64.   static final Logger transferLog = LoggerFactory.getLogger("TransferLog");  
  65.    
  66.   @Override  
  67.   public void read(final ByteBuffer bf, final long offset) throws IOException {  
  68.       //读取内容  
  69. int size = 0;  
  70.       while (bf.hasRemaining()) {  
  71.           final int l = this.channel.read(bf, offset + size);  
  72.           if (l < 0) {  
  73.               break;  
  74.           }  
  75.           size += l;  
  76.       }  
  77.   }  
  78.    
  79.   @Override  
  80.   public void read(final ByteBuffer bf) throws IOException {  
  81.       this.read(bf, this.offset);  
  82.   }  
  83.    
  84.  //下面的write方法主要是提供zero拷贝  
  85.   @Override  
  86.   public void write(final GetCommand getCommand, final SessionContext ctx) {  
  87.       final IoBuffer buf = this.makeHead(getCommand.getOpaque(), this.sizeInBytes.get());  
  88.       // transfer to socket  
  89.       this.tryToLogTransferInfo(getCommand, ctx.getConnection());  
  90.       ctx.getConnection().transferFrom(buf, nullthis.channel, this.offset, this.sizeInBytes.get());  
  91.   }  
  92.    
  93.   public long write(final WritableByteChannel socketChanel) throws IOException {  
  94.       try {  
  95.           return this.getFileChannel().transferTo(this.offset, this.getSizeInBytes(), socketChanel);  
  96.       } catch (final IOException e) {  
  97.           // Check to see if the IOException is being thrown due to  
  98.           // http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=5103988  
  99.           final String message = e.getMessage();  
  100.           if (message != null && message.contains("temporarily unavailable")) {  
  101.               return 0;  
  102.           }  
  103.           throw e;  
  104.       }  
  105.   }  
  106. ……  
  107. private static boolean fastBoot = Boolean.valueOf(System.getProperty("meta.fast_boot""false"));  
  108.    
  109.   private long recover() throws IOException {  
  110.        //如果在System属性里设置了 meta.fast_boot为true,表示快速启动,快速启动不检测文件是否损坏,不对内容进行校验  
  111.       if (fastBoot) {  
  112.           final long size = this.channel.size();  
  113.           this.sizeInBytes.set(size);  
  114.           this.highWaterMark.set(size);  
  115.           this.messageCount.set(0);  
  116.           this.channel.position(size);  
  117.           return 0;  
  118.       }  
  119.       if (!this.mutable) {  
  120.           throw new UnsupportedOperationException("Immutable message set");  
  121.       }  
  122.       final long len = this.channel.size();  
  123.       final ByteBuffer buf = ByteBuffer.allocate(MessageUtils.HEADER_LEN);  
  124.       long validUpTo = 0L;  
  125.       long next = 0L;  
  126.       long msgCount = 0;  
  127.       do {  
  128.           next = this.validateMessage(buf, validUpTo, len);  
  129.           if (next >= 0) {  
  130.               msgCount++;  
  131.               validUpTo = next;  
  132.           }  
  133.       } while (next >= 0);  
  134.       this.channel.truncate(validUpTo);  
  135.       this.sizeInBytes.set(validUpTo);  
  136.       this.highWaterMark.set(validUpTo);  
  137.       this.messageCount.set(msgCount);  
  138.       this.channel.position(validUpTo);  
  139.       return len - validUpTo;  
  140.   }  
  141.    
  142. 消息在磁盘上存储结构如下:  
  143.   /* 
  144.    * 20个字节的头部 * 
  145.    * <ul> 
  146.    * <li>message length(4 bytes),including attribute and payload</li> 
  147.    * <li>checksum(4 bytes)</li> 
  148.    * <li>message id(8 bytes)</li> 
  149.    * <li>message flag(4 bytes)</li> 
  150.    * </ul> 
  151.    *  length长度的内容 
  152.       */  
  153.   //在存储之前将checksum的信息存入到磁盘,读取的时候再进行校验,比较前后的  
  154.   //checksum是否一致,防止消息被篡改  
  155.   private long validateMessage(final ByteBuffer buf, final long start, final long len) throws IOException {  
  156.       buf.rewind();  
  157.       long read = this.channel.read(buf);  
  158.       if (read < MessageUtils.HEADER_LEN) {  
  159.           return -1;  
  160.       }  
  161.       buf.flip();  
  162.       final int messageLen = buf.getInt();  
  163.       final long next = start + MessageUtils.HEADER_LEN + messageLen;  
  164.       if (next > len) {  
  165.           return -1;  
  166.       }  
  167.       final int checksum = buf.getInt();  
  168.       if (messageLen < 0) {  
  169.           // 数据损坏  
  170.           return -1;  
  171.       }  
  172.    
  173.       final ByteBuffer messageBuffer = ByteBuffer.allocate(messageLen);  
  174. //        long curr = start + MessageUtils.HEADER_LEN;  
  175.       while (messageBuffer.hasRemaining()) {  
  176.           read = this.channel.read(messageBuffer);  
  177.           if (read < 0) {  
  178.               throw new IOException("文件在recover过程中被修改");  
  179.           }  
  180. //            curr += read;  
  181.       }  
  182.       if (CheckSum.crc32(messageBuffer.array()) != checksum) {  
  183.           //采用crc32对内容进行校验是否一致  
  184.           return -1;  
  185.       } else {  
  186.           return next;  
  187.       }  
  188.   }  

    FileMessageSet是MetaQ 服务器端存储的一个元素,代表了对一个文件的读写操作,能够对文件内容进行完整性和一致性校验,并能提供统计数据,接下来要介绍的是通过 MessageStore怎样的组合将一个个的FileMessageSet,单纯的讲,MetaQ的存储非常值得借鉴。

分享到:
评论
1 楼 jiangduxi 2015-06-14  
您好,请教下这个开源中间件是否适用 IM。尤其是移动通讯中的弱网络的情况!
谢谢

相关推荐

    metamorphosis(metaq)

    二、MetaQ架构 MetaQ采用主从复制的架构,确保服务的高可用性。每个主题(Topic)可以有多个分区(Partition),每个分区有一个主节点和多个备份节点。当主节点故障时,备份节点能够快速接管,保证服务不中断。此外...

    Metaq原理与应用

    Metaq 是一种高性能、高可用的消息中间件,其设计灵感来源于 Kafka,但并不严格遵循任何特定的规范,如 JMS(Java Message Service)或 CORBA Notification 规范。Metaq 提供了丰富的特性来解决 Messaging System 中...

    metaQ向spark传数据

    在大数据处理领域,MetaQ和Spark是两个非常关键的组件。MetaQ是腾讯开源的一款分布式消息中间件,常用于实时数据处理系统中的消息传递。而Spark则是一个强大的、通用的并行计算框架,专为大数据分析设计,尤其擅长...

    metaq-server-1.4.6.2.tar.gz

    二、MetaQ Server 1.4.6.2特性 1. 高可用:MetaQ Server通过主备切换机制确保服务的连续性,当主节点故障时,备份节点可以快速接管服务,保证业务不受影响。 2. 高性能:采用多线程并行处理和异步I/O模型,实现...

    metaq-server-1.4.6.2客户端+服务端

    MetaQ是阿里巴巴开源的一款分布式消息中间件,它主要用于在大规模分布式系统中提供高效、可靠的消息传递服务。MetaQ Server 1.4.6.2版本是这个中间件的一个特定发行版,包含了服务端和客户端的组件,以及相关的...

    Metaq在JDk 7下的异常及解决方案

    本文将深入解析这一异常的具体情况,分析其原因,并提出相应的解决方案。 异常现象主要表现为:在尝试清理内存映射文件时,由于Java反射机制调用了`java.nio.DirectByteBuffer`类中的`viewedBuffer()`方法,导致`...

    metaq-server-1.4.6.2.zip 和原版一样就是换了个名字

    最后,MetaQ还提供了丰富的监控和管理工具,包括日志分析、性能指标监控和Web管理界面,方便运维人员进行故障排查和系统调优。 综上所述,MetaQ服务器1.4.6.2版本在保持原有功能的基础上,可能针对性能、稳定性和...

    Metaq详细手册.docx

    《Metaq详细手册》 Metaq,源自LinkedIn的开源消息中间件Kafka的Java实现——Memorphosis,针对淘宝内部的应用需求进行了定制和优化。它遵循一系列设计原则,旨在提供高效、可靠且灵活的消息传递服务。 1. **消息...

    metaQ的安装包

    - 日志分析:监控日志可以帮助排查问题,优化系统性能。 - 性能调优:根据系统负载调整 Broker 和 Controller 的配置,如内存分配、线程池大小等。 - 故障恢复:如果某台服务器出现故障,可以通过 ZooKeeper 自动...

    metaq消息中间件服务端、客户端资源汇集

    Metamorphosis是淘宝开源的一个Java消息中间件,他类似apache-kafka,但不是一个简单的山寨拷贝,而是做了很多改进和优化,项目的主页在淘蝌蚪上。服务端、客户端、javadoc都包含在内。

    MetaQ 分布式消息服务中间件.pdf

    MetaQ是一款分布式消息服务中间件,其核心功能基于发布-订阅模型。在这一模型中,发布者(Producer)将消息发布到MetaQ,MetaQ会储存这些消息,而订阅者(Consumer)则通过pull方式来消费这些消息。具体而言,消费者...

    支付宝之所以牛逼的原因:来看内部架构剖析

    Metamorphosis(MetaQ)是一个高性能、高可用、可扩展的分布式消息中间件,类似于LinkedIn的Kafka,具有消息存储顺序写、吞吐量大和支持本地和XA事务等特性,适用于大吞吐量、顺序消息、广播和日志数据传输等场景,在...

    阿里rocketMQ

    在实际应用中,RocketMQ常用于构建大型分布式系统的事件驱动架构,例如订单处理、日志收集、用户行为分析等场景。由于其优秀的性能和稳定性,RocketMQ在电商、金融、物联网等多个领域得到了广泛应用。学习和掌握...

    支付宝钱包系统架构内部剖析(架构图)

    通过上述分析可以看出,支付宝钱包系统的架构设计不仅考虑到了高并发场景下的性能优化问题,还特别注重了系统的稳定性和安全性。其中,Metamorphosis (MetaQ)作为支付宝内部广泛使用的一款分布式消息中间件,凭借其...

    RocketMQ最全介绍与实战.pdf

    RocketMQ 的前世今生是 Metaq,Metaq 在阿里巴巴集团内部、蚂蚁金服、菜鸟等各业务中被广泛使用,接入了上万个应用系统中。 RocketMQ 的使用场景包括应用解耦、流量削峰等。应用解耦系统的耦合性越高,容错性就越低...

    Metamorphosis, 一种高可用高性能的分布式.zip

    Metamorphosis, 一种高可用高性能的分布式 #新闻MetaQ 1.4.6.2 发布。更新日志MetaQ 1.4.6.1 发布。更新日志MetaQ 1.4.5.1 发布。更新日志MetaQ 1.4.5发布。更新日志meta: 一个用于的ruby 客户端。 源代码

    Storm项目:流数据监控(下)

    该文档为storm模拟项目系列文档之一,是MetaQ与storm接口的说明文档,主要介绍了如何集成MetaQ到项目代码中。软件(阿里),其对应的许多技术文档还是比较容易看的,并且Github提供了许多的应用实例,所以使用MetaQ...

Global site tag (gtag.js) - Google Analytics