`
wbj0110
  • 浏览: 1610315 次
  • 性别: Icon_minigender_1
  • 来自: 上海
文章分类
社区版块
存档分类
最新评论

MetaQ技术内幕——源码分析(二)

阅读更多

消息,是MetaQ最重要的资源,在分析MetaQ之前必须了解的概念,我们所做的一切都是围绕消息进行的,让我们看看MetaQ中消息的定义是怎样的,MetaQ的类Message定义了消息的格式:

Java代码 复制代码 收藏代码
  1. public class Messageimplements Serializable { 
  2.   private long id;//消息的ID 
  3.   private String topic; //消息属于哪个主题 
  4.   private byte[] data; //消息的内容 
  5.   private String attribute; //消息的属性 
  6.   private int flag;//属性标志位,如果属性不为空,则该标志位true 
  7.   private Partition partition; //该主题下的哪个分区,简单的理解为发送到该主题下的哪个队列 
  8.   private transientboolean readOnly; //消息是否只读 
  9.   private transientboolean rollbackOnly = false;//该消息是否需要回滚,主要用于事务实现的需要 
[java] view plaincopy在CODE上查看代码片派生到我的代码片
 
  1. public class Message implements Serializable {  
  2.   private long id; //消息的ID  
  3.   private String topic; //消息属于哪个主题  
  4.   private byte[] data;  //消息的内容  
  5.   private String attribute; //消息的属性  
  6.   private int flag; //属性标志位,如果属性不为空,则该标志位true  
  7.   private Partition partition; //该主题下的哪个分区,简单的理解为发送到该主题下的哪个队列  
  8.   private transient boolean readOnly; //消息是否只读  
  9.   private transient boolean rollbackOnly = false//该消息是否需要回滚,主要用于事务实现的需要  
  10. }  

从对消息的定义,我们可以看出消息都有一个唯一的ID,并且归属于某个主题,发送到该主题下的某个分区,具有一些基本属性。

 

MetaQ分为Broker和Client以及Zookeeper,系统结构如下:


 

 

下面我们先分析Broker源码。

 

Broker主要围绕发送消息和消费消息的主线进行,对于Broker来说就是输入、输出流的处理。在该主线下,Broker主要分为如下模块:网络传输模块、消息存储模块、消息统计模块以及事务模块,本篇首先针对独立性较强的消息存储模块进行分析。

 

在进行存储模块分析之前,我们得了解Broker中的一个重要的类MetaConfig,MetaConfig是Broker配置加载器,通过MetaConfig可以获取到各模块相关的配置,所以MetaConfig是贯穿所有模块的类。MetaConfig实现MetaConfigMBean接口,该接口定义如下:

Java代码 复制代码 收藏代码
  1. public interface MetaConfigMBean { 
  2.     /**
  3.      * Reload topics configuration
  4.      */ 
  5.     public void reload(); 
  6.   
  7.     /**关闭分区 */ 
  8.     public void closePartitions(String topic,int start, int end); 
  9.   
  10.     /**打开一个topic的所有分区 */ 
  11.     public void openPartitions(String topic); 
[java] view plaincopy在CODE上查看代码片派生到我的代码片
 
  1. public interface MetaConfigMBean {  
  2.     /** 
  3.      * Reload topics configuration 
  4.      */  
  5.     public void reload();  
  6.    
  7.     /**关闭分区 */  
  8.     public void closePartitions(String topic, int start, int end);  
  9.    
  10.     /**打开一个topic的所有分区 */  
  11.     public void openPartitions(String topic);  
  12. }  

 

MetaConfig注册到了MBeanServer上,所以可以通过JMX协议重新加载配置以及关闭和打开分区。为了加载的配置立即生效,MetaConfig内置了一个通知机制,可以通过向MetaConfig注册监听器的方式关注相关配置的变化,监听器需实现PropertyChangeListener接口。

Java代码 复制代码 收藏代码
  1. public void addPropertyChangeListener(final String propertyName,finalPropertyChangeListener listener) { 
  2.       this.propertyChangeSupport.addPropertyChangeListener(propertyName, listener); 
  3.   
  4. public void removePropertyChangeListener(final PropertyChangeListener listener) { 
  5.       this.propertyChangeSupport.removePropertyChangeListener(listener); 
[java] view plaincopy在CODE上查看代码片派生到我的代码片
 
  1. public void addPropertyChangeListener(final String propertyName, final PropertyChangeListener listener) {  
  2.       this.propertyChangeSupport.addPropertyChangeListener(propertyName, listener);  
  3. }  
  4.    
  5. public void removePropertyChangeListener(final PropertyChangeListener listener) {  
  6.       this.propertyChangeSupport.removePropertyChangeListener(listener);  
  7. }  

 

目前MetaConfig发出的事件通知有三种:配置文件发生变化(configFileChecksum)、主题发生变化(topics,topicConfigMap)以及刷新存储的频率发生变化(unflushInterval),代码如下:

Java代码 复制代码 收藏代码
  1. //configFileChecksum通知 
  2. private Ini createIni(final File file)throws IOException, InvalidFileFormatException { 
  3.       …… 
  4.       this.propertyChangeSupport.firePropertyChange("configFileChecksum",nullnull); 
  5.       return conf; 
  6.   
  7. public void setConfigFileChecksum(long configFileChecksum) { 
  8.       this.configFileChecksum = configFileChecksum; 
  9.       this.propertyChangeSupport.firePropertyChange("configFileChecksum",nullnull); 
  10.   
  11. //topics、topicConfigMap和unflushInterval通知 
  12. private void populateTopicsConfig(final Ini conf) { 
  13. …… 
  14. if (!newTopicConfigMap.equals(this.topicConfigMap)) { 
  15.           this.topics = newTopics; 
  16.           this.topicConfigMap = newTopicConfigMap; 
  17.           this.propertyChangeSupport.firePropertyChange("topics",nullnull); 
  18.           this.propertyChangeSupport.firePropertyChange("topicConfigMap",nullnull); 
  19.   } 
  20.   this.propertyChangeSupport.firePropertyChange("unflushInterval",nullnull); 
  21. …… 
[java] view plaincopy在CODE上查看代码片派生到我的代码片
 
  1. //configFileChecksum通知  
  2. private Ini createIni(final File file) throws IOException, InvalidFileFormatException {  
  3.       ……  
  4.       this.propertyChangeSupport.firePropertyChange("configFileChecksum"nullnull);  
  5.       return conf;  
  6. }  
  7.    
  8. public void setConfigFileChecksum(long configFileChecksum) {  
  9.       this.configFileChecksum = configFileChecksum;  
  10.       this.propertyChangeSupport.firePropertyChange("configFileChecksum"nullnull);  
  11. }  
  12.    
  13. //topics、topicConfigMap和unflushInterval通知  
  14. private void populateTopicsConfig(final Ini conf) {  
  15. ……  
  16. if (!newTopicConfigMap.equals(this.topicConfigMap)) {  
  17.           this.topics = newTopics;  
  18.           this.topicConfigMap = newTopicConfigMap;  
  19.           this.propertyChangeSupport.firePropertyChange("topics"nullnull);  
  20.           this.propertyChangeSupport.firePropertyChange("topicConfigMap"nullnull);  
  21.   }  
  22.   this.propertyChangeSupport.firePropertyChange("unflushInterval"nullnull);  
  23. ……  

需要注意的是,调用reload方法时,只对topic的配置生效,对全局配置不生效,只重载topic的配置。

 

好吧,废话了许多,让我们正式进入存储模块的分析吧。

 

Broker的存储模块用于存储Client发送的等待被消费的消息,Broker采用文件存储的方式来存储消息,存储模块类图如下:

 



 

 

MessageSet代表一个消息集合,可能是一个文件也可能是文件的一部分,其定义如下:

 

Java代码 复制代码 收藏代码
  1. /**
  2. * 消息集合
  3. */ 
  4. public interface MessageSet { 
  5.   public MessageSet slice(long offset,long limit) throws IOException;//获取一个消息集合 
  6.   
  7.   public void write(GetCommand getCommand, SessionContext ctx); 
  8.   
  9.   public long append(ByteBuffer buff)throws IOException; //存储一个消息,这时候还没有存储到磁盘,需要调用flush方法才能保证存储到磁盘 
  10.   
  11.   public void flush()throws IOException; //提交到磁盘 
  12.   
  13.   public void read(final ByteBuffer bf,long offset) throws IOException;//读取消息 
  14.   
  15.   public void read(final ByteBuffer bf)throws IOException; //读取消息 
  16.   
  17.   public long getMessageCount();//该集合的消息数量 
[java] view plaincopy在CODE上查看代码片派生到我的代码片
 
  1. /** 
  2.  * 消息集合 
  3.  */  
  4. public interface MessageSet {  
  5.   public MessageSet slice(long offset, long limit) throws IOException; //获取一个消息集合  
  6.    
  7.   public void write(GetCommand getCommand, SessionContext ctx);  
  8.    
  9.   public long append(ByteBuffer buff) throws IOException; //存储一个消息,这时候还没有存储到磁盘,需要调用flush方法才能保证存储到磁盘  
  10.    
  11.   public void flush() throws IOException; //提交到磁盘  
  12.    
  13.   public void read(final ByteBuffer bf, long offset) throws IOException; //读取消息  
  14.    
  15.   public void read(final ByteBuffer bf) throws IOException; //读取消息  
  16.    
  17.   public long getMessageCount();//该集合的消息数量  
  18. }  

FileMessageSet实现了MessageSet接口和Closeable接口,实现Closeable接口主要是为了在文件关闭的时候确保文件通道关闭以及内容是否提交到磁盘

 

Java代码 复制代码 收藏代码
  1. public void close()throws IOException { 
  2.       if (!this.channel.isOpen()) { 
  3.           return
  4.       } 
  5.       //保证在文件关闭前,将内容提交到磁盘,而不是在缓存中 
  6.       if (this.mutable) { 
  7.           this.flush(); 
  8.       } 
  9.       //关闭文件通道 
  10.       this.channel.close(); 
  11.   } 
[java] view plaincopy在CODE上查看代码片派生到我的代码片
 
  1. public void close() throws IOException {  
  2.       if (!this.channel.isOpen()) {  
  3.           return;  
  4.       }  
  5.       //保证在文件关闭前,将内容提交到磁盘,而不是在缓存中  
  6.       if (this.mutable) {  
  7.           this.flush();  
  8.       }  
  9.       //关闭文件通道  
  10.       this.channel.close();  
  11.   }  

下面让我们来具体分析一下FileMessageSet这个类,

Java代码 复制代码 收藏代码
  1. public class FileMessageSetimplements MessageSet, Closeable { 
  2.     …… 
  3.   private final FileChannel channel;//对应的文件通道 
  4.   private final AtomicLong messageCount;//内容数量 
  5.   private final AtomicLong sizeInBytes;  
  6.   private final AtomicLong highWaterMark;// 已经确保写入磁盘的水位 
  7.   private finallong offset; // 镜像offset 
  8.   private boolean mutable;  // 是否可变 
  9.   
  10.   public FileMessageSet(final FileChannel channel,final long offset,final long limit,finalboolean mutable)throws IOException { 
  11.       this.channel = channel; 
  12.       this.offset = offset; 
  13.       this.messageCount = new AtomicLong(0); 
  14.       this.sizeInBytes = new AtomicLong(0); 
  15.       this.highWaterMark = new AtomicLong(0); 
  16.       this.mutable = mutable; 
  17.       if (mutable) {  
  18.           final long startMs = System.currentTimeMillis(); 
  19.           final long truncated =this.recover(); 
  20.           if (this.messageCount.get() >0) { 
  21.               log.info("Recovery succeeded in " + (System.currentTimeMillis() - startMs) /1000 + " seconds. " + truncated +" bytes truncated."); 
  22.           } 
  23.       } else { 
  24.           try { 
  25.               this.sizeInBytes.set(Math.min(channel.size(), limit) - offset); 
  26.               this.highWaterMark.set(this.sizeInBytes.get()); 
  27.           } catch (final Exception e) { 
  28.               log.error("Set sizeInBytes error", e); 
  29.           } 
  30.       } 
  31.   } 
  32. //注意FileMessageSet的mutable属性,如果mutable为true的时候,将会调用recover()方法,该方法主要是验证文件内容的完整性,后面会详细介绍,如果mutable为false的时候,表明该文件不可更改,这个时候磁盘水位和sizeInBytes的值均为文件大小。 
  33.   
  34. …… 
  35. public long append(final ByteBuffer buf)throws IOException { 
  36.       //如果mutable属性为false的时候,不允许追加消息在文件尾 
  37.       if (!this.mutable) { 
  38.           throw new UnsupportedOperationException("Immutable message set"); 
  39.       } 
  40.       final long offset =this.sizeInBytes.get(); 
  41.       int sizeInBytes = 0
  42.       while (buf.hasRemaining()) { 
  43.           sizeInBytes += this.channel.write(buf); 
  44.       } 
  45.       //在这个时候并没有将内容写入磁盘,还在通道的缓存中,需要在适当的时候调用flush方法 
  46.        //这个时候磁盘的水位是不更新的,只有在保证写入磁盘才会更新磁盘的水位信息 
  47.       this.sizeInBytes.addAndGet(sizeInBytes); 、 
  48.       this.messageCount.incrementAndGet(); 
  49.       return offset; 
  50.   } 
  51.   
  52.    //提交到磁盘 
  53.   public void flush()throws IOException { 
  54.       this.channel.force(true); 
  55.       this.highWaterMark.set(this.sizeInBytes.get()); 
  56.   } 
  57. …… 
  58. @Override 
  59.   public MessageSet slice(finallong offset, finallong limit) throws IOException { 
  60.       //返回消息集 
  61. return new FileMessageSet(this.channel, offset, limit,false); 
  62.   } 
  63.   
  64.   static final Logger transferLog = LoggerFactory.getLogger("TransferLog"); 
  65.   
  66.   @Override 
  67.   public void read(final ByteBuffer bf,final long offset)throws IOException { 
  68.       //读取内容 
  69. int size = 0
  70.       while (bf.hasRemaining()) { 
  71.           final int l =this.channel.read(bf, offset + size); 
  72.           if (l < 0) { 
  73.               break
  74.           } 
  75.           size += l; 
  76.       } 
  77.   } 
  78.   
  79.   @Override 
  80.   public void read(final ByteBuffer bf)throws IOException { 
  81.       this.read(bf, this.offset); 
  82.   } 
  83.   
  84. //下面的write方法主要是提供zero拷贝 
  85.   @Override 
  86.   public void write(final GetCommand getCommand,final SessionContext ctx) { 
  87.       final IoBuffer buf = this.makeHead(getCommand.getOpaque(),this.sizeInBytes.get()); 
  88.       // transfer to socket 
  89.       this.tryToLogTransferInfo(getCommand, ctx.getConnection()); 
  90.       ctx.getConnection().transferFrom(buf, null,this.channel,this.offset,this.sizeInBytes.get()); 
  91.   } 
  92.   
  93.   public long write(final WritableByteChannel socketChanel)throws IOException { 
  94.       try { 
  95.           return this.getFileChannel().transferTo(this.offset,this.getSizeInBytes(), socketChanel); 
  96.       } catch (final IOException e) { 
  97.           // Check to see if the IOException is being thrown due to 
  98.           // http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=5103988 
  99.           final String message = e.getMessage(); 
  100.           if (message != null && message.contains("temporarily unavailable")) { 
  101.               return 0
  102.           } 
  103.           throw e; 
  104.       } 
  105.   } 
  106. …… 
  107. private staticboolean fastBoot = Boolean.valueOf(System.getProperty("meta.fast_boot","false")); 
  108.   
  109.   private long recover()throws IOException { 
  110.        //如果在System属性里设置了 meta.fast_boot为true,表示快速启动,快速启动不检测文件是否损坏,不对内容进行校验 
  111.       if (fastBoot) { 
  112.           final long size =this.channel.size(); 
  113.           this.sizeInBytes.set(size); 
  114.           this.highWaterMark.set(size); 
  115.           this.messageCount.set(0); 
  116.           this.channel.position(size); 
  117.           return 0
  118.       } 
  119.       if (!this.mutable) { 
  120.           throw new UnsupportedOperationException("Immutable message set"); 
  121.       } 
  122.       final long len =this.channel.size(); 
  123.       final ByteBuffer buf = ByteBuffer.allocate(MessageUtils.HEADER_LEN); 
  124.       long validUpTo = 0L; 
  125.       long next = 0L; 
  126.       long msgCount = 0
  127.       do { 
  128.           next = this.validateMessage(buf, validUpTo, len); 
  129.           if (next >= 0) { 
  130.               msgCount++; 
  131.               validUpTo = next; 
  132.           } 
  133.       } while (next >= 0); 
  134.       this.channel.truncate(validUpTo); 
  135.       this.sizeInBytes.set(validUpTo); 
  136.       this.highWaterMark.set(validUpTo); 
  137.       this.messageCount.set(msgCount); 
  138.       this.channel.position(validUpTo); 
  139.       return len - validUpTo; 
  140.   } 
  141.   
  142. 消息在磁盘上存储结构如下: 
  143.   /*
  144.    * 20个字节的头部 *
  145.    * <ul>
  146.    * <li>message length(4 bytes),including attribute and payload</li>
  147.    * <li>checksum(4 bytes)</li>
  148.    * <li>message id(8 bytes)</li>
  149.    * <li>message flag(4 bytes)</li>
  150.    * </ul>
  151.    *  length长度的内容
  152.       */ 
  153.   //在存储之前将checksum的信息存入到磁盘,读取的时候再进行校验,比较前后的 
  154.   //checksum是否一致,防止消息被篡改 
  155.   private long validateMessage(final ByteBuffer buf,final long start,final long len)throwsIOException { 
  156.       buf.rewind(); 
  157.       long read = this.channel.read(buf); 
  158.       if (read < MessageUtils.HEADER_LEN) { 
  159.           return -1
  160.       } 
  161.       buf.flip(); 
  162.       final int messageLen = buf.getInt(); 
  163.       final long next = start + MessageUtils.HEADER_LEN + messageLen; 
  164.       if (next > len) { 
  165.           return -1
  166.       } 
  167.       final int checksum = buf.getInt(); 
  168.       if (messageLen < 0) { 
  169.           // 数据损坏 
  170.           return -1
  171.       } 
  172.   
  173.       final ByteBuffer messageBuffer = ByteBuffer.allocate(messageLen); 
  174. //        long curr = start + MessageUtils.HEADER_LEN; 
  175.       while (messageBuffer.hasRemaining()) { 
  176.           read = this.channel.read(messageBuffer); 
  177.           if (read < 0) { 
  178.               throw new IOException("文件在recover过程中被修改"); 
  179.           } 
  180. //            curr += read; 
  181.       } 
  182.       if (CheckSum.crc32(messageBuffer.array()) != checksum) { 
  183.           //采用crc32对内容进行校验是否一致 
  184.           return -1
  185.       } else { 
  186.           return next; 
  187.       } 
  188.   } 
[java] view plaincopy在CODE上查看代码片派生到我的代码片
 
  1. public class FileMessageSet implements MessageSet, Closeable {  
  2.     ……  
  3.   private final FileChannel channel; //对应的文件通道  
  4.   private final AtomicLong messageCount; //内容数量  
  5.   private final AtomicLong sizeInBytes;   
  6.   private final AtomicLong highWaterMark; // 已经确保写入磁盘的水位  
  7.   private final long offset; // 镜像offset  
  8.   private boolean mutable;   // 是否可变  
  9.    
  10.   public FileMessageSet(final FileChannel channel, final long offset, final long limit, final boolean mutable) throws IOException {  
  11.       this.channel = channel;  
  12.       this.offset = offset;  
  13.       this.messageCount = new AtomicLong(0);  
  14.       this.sizeInBytes = new AtomicLong(0);  
  15.       this.highWaterMark = new AtomicLong(0);  
  16.       this.mutable = mutable;  
  17.       if (mutable) {   
  18.           final long startMs = System.currentTimeMillis();  
  19.           final long truncated = this.recover();  
  20.           if (this.messageCount.get() > 0) {  
  21.               log.info("Recovery succeeded in " + (System.currentTimeMillis() - startMs) / 1000 + " seconds. " + truncated + " bytes truncated.");  
  22.           }  
  23.       } else {  
  24.           try {  
  25.               this.sizeInBytes.set(Math.min(channel.size(), limit) - offset);  
  26.               this.highWaterMark.set(this.sizeInBytes.get());  
  27.           } catch (final Exception e) {  
  28.               log.error("Set sizeInBytes error", e);  
  29.           }  
  30.       }  
  31.   }  
  32. //注意FileMessageSet的mutable属性,如果mutable为true的时候,将会调用recover()方法,该方法主要是验证文件内容的完整性,后面会详细介绍,如果mutable为false的时候,表明该文件不可更改,这个时候磁盘水位和sizeInBytes的值均为文件大小。  
  33.    
  34. ……  
  35. public long append(final ByteBuffer buf) throws IOException {  
  36.       //如果mutable属性为false的时候,不允许追加消息在文件尾  
  37.       if (!this.mutable) {  
  38.           throw new UnsupportedOperationException("Immutable message set");  
  39.       }  
  40.       final long offset = this.sizeInBytes.get();  
  41.       int sizeInBytes = 0;  
  42.       while (buf.hasRemaining()) {  
  43.           sizeInBytes += this.channel.write(buf);  
  44.       }  
  45.       //在这个时候并没有将内容写入磁盘,还在通道的缓存中,需要在适当的时候调用flush方法  
  46.        //这个时候磁盘的水位是不更新的,只有在保证写入磁盘才会更新磁盘的水位信息  
  47.       this.sizeInBytes.addAndGet(sizeInBytes); 、  
  48.       this.messageCount.incrementAndGet();  
  49.       return offset;  
  50.   }  
  51.    
  52.    //提交到磁盘  
  53.   public void flush() throws IOException {  
  54.       this.channel.force(true);  
  55.       this.highWaterMark.set(this.sizeInBytes.get());  
  56.   }  
  57. ……  
  58. @Override  
  59.   public MessageSet slice(final long offset, final long limit) throws IOException {  
  60.       //返回消息集  
  61. return new FileMessageSet(this.channel, offset, limit, false);  
  62.   }  
  63.    
  64.   static final Logger transferLog = LoggerFactory.getLogger("TransferLog");  
  65.    
  66.   @Override  
  67.   public void read(final ByteBuffer bf, final long offset) throws IOException {  
  68.       //读取内容  
  69. int size = 0;  
  70.       while (bf.hasRemaining()) {  
  71.           final int l = this.channel.read(bf, offset + size);  
  72.           if (l < 0) {  
  73.               break;  
  74.           }  
  75.           size += l;  
  76.       }  
  77.   }  
  78.    
  79.   @Override  
  80.   public void read(final ByteBuffer bf) throws IOException {  
  81.       this.read(bf, this.offset);  
  82.   }  
  83.    
  84.  //下面的write方法主要是提供zero拷贝  
  85.   @Override  
  86.   public void write(final GetCommand getCommand, final SessionContext ctx) {  
  87.       final IoBuffer buf = this.makeHead(getCommand.getOpaque(), this.sizeInBytes.get());  
  88.       // transfer to socket  
  89.       this.tryToLogTransferInfo(getCommand, ctx.getConnection());  
  90.       ctx.getConnection().transferFrom(buf, nullthis.channel, this.offset, this.sizeInBytes.get());  
  91.   }  
  92.    
  93.   public long write(final WritableByteChannel socketChanel) throws IOException {  
  94.       try {  
  95.           return this.getFileChannel().transferTo(this.offset, this.getSizeInBytes(), socketChanel);  
  96.       } catch (final IOException e) {  
  97.           // Check to see if the IOException is being thrown due to  
  98.           // http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=5103988  
  99.           final String message = e.getMessage();  
  100.           if (message != null && message.contains("temporarily unavailable")) {  
  101.               return 0;  
  102.           }  
  103.           throw e;  
  104.       }  
  105.   }  
  106. ……  
  107. private static boolean fastBoot = Boolean.valueOf(System.getProperty("meta.fast_boot""false"));  
  108.    
  109.   private long recover() throws IOException {  
  110.        //如果在System属性里设置了 meta.fast_boot为true,表示快速启动,快速启动不检测文件是否损坏,不对内容进行校验  
  111.       if (fastBoot) {  
  112.           final long size = this.channel.size();  
  113.           this.sizeInBytes.set(size);  
  114.           this.highWaterMark.set(size);  
  115.           this.messageCount.set(0);  
  116.           this.channel.position(size);  
  117.           return 0;  
  118.       }  
  119.       if (!this.mutable) {  
  120.           throw new UnsupportedOperationException("Immutable message set");  
  121.       }  
  122.       final long len = this.channel.size();  
  123.       final ByteBuffer buf = ByteBuffer.allocate(MessageUtils.HEADER_LEN);  
  124.       long validUpTo = 0L;  
  125.       long next = 0L;  
  126.       long msgCount = 0;  
  127.       do {  
  128.           next = this.validateMessage(buf, validUpTo, len);  
  129.           if (next >= 0) {  
  130.               msgCount++;  
  131.               validUpTo = next;  
  132.           }  
  133.       } while (next >= 0);  
  134.       this.channel.truncate(validUpTo);  
  135.       this.sizeInBytes.set(validUpTo);  
  136.       this.highWaterMark.set(validUpTo);  
  137.       this.messageCount.set(msgCount);  
  138.       this.channel.position(validUpTo);  
  139.       return len - validUpTo;  
  140.   }  
  141.    
  142. 消息在磁盘上存储结构如下:  
  143.   /* 
  144.    * 20个字节的头部 * 
  145.    * <ul> 
  146.    * <li>message length(4 bytes),including attribute and payload</li> 
  147.    * <li>checksum(4 bytes)</li> 
  148.    * <li>message id(8 bytes)</li> 
  149.    * <li>message flag(4 bytes)</li> 
  150.    * </ul> 
  151.    *  length长度的内容 
  152.       */  
  153.   //在存储之前将checksum的信息存入到磁盘,读取的时候再进行校验,比较前后的  
  154.   //checksum是否一致,防止消息被篡改  
  155.   private long validateMessage(final ByteBuffer buf, final long start, final long len) throws IOException {  
  156.       buf.rewind();  
  157.       long read = this.channel.read(buf);  
  158.       if (read < MessageUtils.HEADER_LEN) {  
  159.           return -1;  
  160.       }  
  161.       buf.flip();  
  162.       final int messageLen = buf.getInt();  
  163.       final long next = start + MessageUtils.HEADER_LEN + messageLen;  
  164.       if (next > len) {  
  165.           return -1;  
  166.       }  
  167.       final int checksum = buf.getInt();  
  168.       if (messageLen < 0) {  
  169.           // 数据损坏  
  170.           return -1;  
  171.       }  
  172.    
  173.       final ByteBuffer messageBuffer = ByteBuffer.allocate(messageLen);  
  174. //        long curr = start + MessageUtils.HEADER_LEN;  
  175.       while (messageBuffer.hasRemaining()) {  
  176.           read = this.channel.read(messageBuffer);  
  177.           if (read < 0) {  
  178.               throw new IOException("文件在recover过程中被修改");  
  179.           }  
  180. //            curr += read;  
  181.       }  
  182.       if (CheckSum.crc32(messageBuffer.array()) != checksum) {  
  183.           //采用crc32对内容进行校验是否一致  
  184.           return -1;  
  185.       } else {  
  186.           return next;  
  187.       }  
  188.   }  

    FileMessageSet是MetaQ 服务器端存储的一个元素,代表了对一个文件的读写操作,能够对文件内容进行完整性和一致性校验,并能提供统计数据,接下来要介绍的是通过MessageStore怎样的组合将一个个的FileMessageSet,单纯的讲,MetaQ的存储非常值得借鉴。

分享到:
评论

相关推荐

    metamorphosis(metaq)

    二、MetaQ架构 MetaQ采用主从复制的架构,确保服务的高可用性。每个主题(Topic)可以有多个分区(Partition),每个分区有一个主节点和多个备份节点。当主节点故障时,备份节点能够快速接管,保证服务不中断。此外...

    Metaq原理与应用

    Metaq 是一种高性能、高可用的消息中间件,其设计灵感来源于 Kafka,但并不严格遵循任何特定的规范,如 JMS(Java Message Service)或 CORBA Notification 规范。Metaq 提供了丰富的特性来解决 Messaging System 中...

    metaQ向spark传数据

    在大数据处理领域,MetaQ和Spark是两个非常关键的组件。MetaQ是腾讯开源的一款分布式消息中间件,常用于实时数据处理系统中的消息传递。而Spark则是一个强大的、通用的并行计算框架,专为大数据分析设计,尤其擅长...

    metaq-server-1.4.6.2.tar.gz

    二、MetaQ Server 1.4.6.2特性 1. 高可用:MetaQ Server通过主备切换机制确保服务的连续性,当主节点故障时,备份节点可以快速接管服务,保证业务不受影响。 2. 高性能:采用多线程并行处理和异步I/O模型,实现...

    metaq-server-1.4.6.2客户端+服务端

    MetaQ是阿里巴巴开源的一款分布式消息中间件,它主要用于在大规模分布式系统中提供高效、可靠的消息传递服务。MetaQ Server 1.4.6.2版本是这个中间件的一个特定发行版,包含了服务端和客户端的组件,以及相关的...

    Metaq在JDk 7下的异常及解决方案

    本文将深入解析这一异常的具体情况,分析其原因,并提出相应的解决方案。 异常现象主要表现为:在尝试清理内存映射文件时,由于Java反射机制调用了`java.nio.DirectByteBuffer`类中的`viewedBuffer()`方法,导致`...

    RocketMQ最全介绍与实战.pdf

    RocketMQ 的前世今生是 Metaq,Metaq 在阿里巴巴集团内部、蚂蚁金服、菜鸟等各业务中被广泛使用,接入了上万个应用系统中。 RocketMQ 的使用场景包括应用解耦、流量削峰等。应用解耦系统的耦合性越高,容错性就越低...

    Metaq详细手册.docx

    Metaq,源自LinkedIn的开源消息中间件Kafka的Java实现——Memorphosis,针对淘宝内部的应用需求进行了定制和优化。它遵循一系列设计原则,旨在提供高效、可靠且灵活的消息传递服务。 1. **消息持久化**:Metaq保证...

    metaq-server-1.4.6.2.zip 和原版一样就是换了个名字

    最后,MetaQ还提供了丰富的监控和管理工具,包括日志分析、性能指标监控和Web管理界面,方便运维人员进行故障排查和系统调优。 综上所述,MetaQ服务器1.4.6.2版本在保持原有功能的基础上,可能针对性能、稳定性和...

    metaQ的安装包

    MetaQ,全称为“Meta Message Queue”,是阿里巴巴开源的一款分布式消息中间件,主要用于解决大规模分布式系统中的消息传递问题。MetaQ 提供了高可用、高可靠的消息服务,支持多种消息模型,如点对点(Point-to-...

    metaq消息中间件服务端、客户端资源汇集

    Metamorphosis是淘宝开源的一个Java消息中间件,他类似apache-kafka,但不是一个简单的山寨拷贝,而是做了很多改进和优化,项目的主页在淘蝌蚪上。服务端、客户端、javadoc都包含在内。

    MetaQ 分布式消息服务中间件.pdf

    MetaQ是一款分布式消息服务中间件,其核心功能基于发布-订阅模型。在这一模型中,发布者(Producer)将消息发布到MetaQ,MetaQ会储存这些消息,而订阅者(Consumer)则通过pull方式来消费这些消息。具体而言,消费者...

    阿里消息中间件MetaQ学习Demo.zip

    阿里消息中间件MetaQ学习Demo

    阿里巴巴企业诚信体系——从大数据到场景应用.pdf

    阿里巴巴企业诚信体系是基于大数据和先进技术构建的一套全面的安全架构,旨在从多个维度评估和管理企业信用风险。这个体系不仅涵盖了安全威胁情报、安全建设、应急响应和法律法规等多个关键领域,还利用自动化手段...

    万亿级数据洪峰下的消息引擎——Apache RocketMQ--阿里.pdf

    ### 万亿级数据洪峰下的消息引擎——Apache RocketMQ #### 阿里消息中间件的演变历史 自2007年起,阿里巴巴集团在消息中间件领域不断探索与实践,经历了从Notify到MetaQ再到Apache RocketMQ的发展历程。以下是这一...

    支付宝钱包系统架构内部剖析(架构图)

    - **事务支持**:MetaQ支持两种类型的事务——本地事务和XA分布式事务。这两种事务类型能够满足支付宝钱包系统在处理复杂金融交易时对数据一致性的需求。 - **高可用复制**:MetaQ提供了异步复制和同步复制两种模式...

    实时数仓2.0——打怪升级之路.pdf

    综上所述,实时数仓2.0是一种先进的数据处理框架,它通过优化数据模型、提升处理速度、确保数据质量,以及利用高级状态管理技术,来满足企业对实时数据分析的高要求。这一解决方案为企业提供了更敏捷的业务洞察,...

    开源技术大会2014-CSDN-蒋涛《关于开源的思考》

    在开源的推动下,许多技术都得到了长足的发展,如LVS(Linux Virtual Server)、Tengine、MetaQ、dubbo、cobar、Fastjson等。这些技术的成功案例表明,开源不仅是技术的共享,也是知识和创新的共享。 蒋涛的讲话...

Global site tag (gtag.js) - Google Analytics